internal

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2023 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RestrictedForwardMessage_InvalidOperation int = 0
	RestrictedForwardMessage_Recursive        int = 1
)
View Source
const (
	LOGGER_PREFIX string = "[worker-nsq] "
)

Variables

View Source
var (
	GlobalTracerManager             *TracerManager      // be register from NsqWorker
	GlobalContextHelper             ContextHelper       = ContextHelper{}
	GlobalRestrictedMessageDelegate nsq.MessageDelegate = RestrictedMessageDelegate(0)

	NsqWorkerModuleInstance = NsqWorkerModule{}

	NsqWorkerLogger *log.Logger = log.New(os.Stdout, LOGGER_PREFIX, log.LstdFlags|log.Lmsgprefix)
)

Functions

func StopRecursiveForwardMessageHandler

func StopRecursiveForwardMessageHandler(ctx *Context, msg *Message) error

Types

type CompositeMessageObserver

type CompositeMessageObserver []MessageObserver

func (CompositeMessageObserver) OnFinish

func (o CompositeMessageObserver) OnFinish(ctx *Context, message *nsq.Message)

OnFinish implements MessageObserver.

func (CompositeMessageObserver) OnRequeue

func (o CompositeMessageObserver) OnRequeue(ctx *Context, message *nsq.Message)

OnRequeue implements MessageObserver.

func (CompositeMessageObserver) OnTouch

func (o CompositeMessageObserver) OnTouch(ctx *Context, message *nsq.Message)

OnTouch implements MessageObserver.

func (CompositeMessageObserver) Type

Type implements MessageObserver.

type Config

type Config = nsq.Config

type Context

type Context struct {
	Channel string
	// contains filtered or unexported fields
}

func (*Context) Deadline

func (*Context) Deadline() (deadline time.Time, ok bool)

Deadline implements context.Context.

func (*Context) Done

func (*Context) Done() <-chan struct{}

Done implements context.Context.

func (*Context) Err

func (*Context) Err() error

Err implements context.Context.

func (*Context) InvalidMessage

func (c *Context) InvalidMessage(message *Message) error

func (*Context) Logger

func (c *Context) Logger() *log.Logger

func (*Context) SetValue

func (c *Context) SetValue(key, value interface{})

SetValue implements trace.ValueContext.

func (*Context) Value

func (c *Context) Value(key interface{}) interface{}

Value implements context.Context.

type ContextHelper

type ContextHelper struct{}

func (ContextHelper) ExtractReplyCode

func (ContextHelper) ExtractReplyCode(ctx *Context) ReplyCode

func (ContextHelper) InjectReplyCode

func (ContextHelper) InjectReplyCode(ctx *Context, reply ReplyCode)

type ContextMessageDelegate

type ContextMessageDelegate struct {
	// contains filtered or unexported fields
}

func NewContextMessageDelegate

func NewContextMessageDelegate(ctx *Context) *ContextMessageDelegate

func (*ContextMessageDelegate) OnFinish

func (d *ContextMessageDelegate) OnFinish(msg *nsq.Message)

func (*ContextMessageDelegate) OnRequeue

func (d *ContextMessageDelegate) OnRequeue(msg *nsq.Message, delay time.Duration, backoff bool)

func (*ContextMessageDelegate) OnTouch

func (d *ContextMessageDelegate) OnTouch(msg *nsq.Message)

type Defer

type Defer struct {
	// contains filtered or unexported fields
}

func (*Defer) Do

func (d *Defer) Do(do func(f Finalizer) error) error

type ErrorHandler

type ErrorHandler func(ctx *Context, message *Message, err interface{})

type Finalizer

type Finalizer struct {
	// contains filtered or unexported fields
}

func (Finalizer) Add

func (f Finalizer) Add(actions ...func(err interface{}))

type Message

type Message = nsq.Message

type MessageDispatcher

type MessageDispatcher struct {
	MessageHandleService   *MessageHandleService
	MessageTracerService   *MessageTracerService
	MessageObserverService *MessageObserverService
	Router                 Router

	OnHostErrorProc OnHostErrorHandler

	ErrorHandler          ErrorHandler
	InvalidMessageHandler MessageHandler
}

func (*MessageDispatcher) ProcessMessage

func (d *MessageDispatcher) ProcessMessage(ctx *Context, message *Message) error

func (*MessageDispatcher) Topics

func (d *MessageDispatcher) Topics() []string

type MessageHandleModule

type MessageHandleModule interface {
	CanSetSuccessor() bool
	SetSuccessor(successor MessageHandleModule)
	ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover) error
	OnInitComplete()
	OnStart(ctx context.Context) error
	OnStop(ctx context.Context) error
}

type MessageHandleProc

type MessageHandleProc func(ctx *Context, message *Message) error

func (MessageHandleProc) ProcessMessage

func (proc MessageHandleProc) ProcessMessage(ctx *Context, message *Message) error

type MessageHandleService

type MessageHandleService struct {
	// contains filtered or unexported fields
}

func NewMessageHandleService

func NewMessageHandleService() *MessageHandleService

func (*MessageHandleService) ProcessMessage

func (s *MessageHandleService) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover) error

func (*MessageHandleService) Register

func (s *MessageHandleService) Register(module MessageHandleModule)

type MessageHandler

type MessageHandler interface {
	ProcessMessage(ctx *Context, message *Message) error
}

type MessageObserver

type MessageObserver interface {
	OnFinish(ctx *Context, message *Message)
	OnRequeue(ctx *Context, message *Message)
	OnTouch(ctx *Context, message *Message)
	Type() reflect.Type
}

type MessageObserverAffair

type MessageObserverAffair interface {
	MessageObserverTypes() []reflect.Type
}

type MessageObserverService

type MessageObserverService struct {
	MessageObservers map[reflect.Type]MessageObserver
	// contains filtered or unexported fields
}

func (*MessageObserverService) RegisterMessageObservers

func (s *MessageObserverService) RegisterMessageObservers(msg *Message, handlerID string)

func (*MessageObserverService) UnregisterAllMessageObservers

func (s *MessageObserverService) UnregisterAllMessageObservers(msg *Message)

type MessageTracerService

type MessageTracerService struct {
	TracerManager *TracerManager

	Enabled bool

	InvalidMessageHandlerComponentID string
	// contains filtered or unexported fields
}

func (*MessageTracerService) TextMapPropagator

func (s *MessageTracerService) TextMapPropagator() propagation.TextMapPropagator

func (*MessageTracerService) Tracer

type NoopMessageDelegate

type NoopMessageDelegate int

func (NoopMessageDelegate) OnFinish

func (NoopMessageDelegate) OnFinish(*nsq.Message)

OnFinish implements nsq.MessageDelegate.

func (NoopMessageDelegate) OnRequeue

func (NoopMessageDelegate) OnRequeue(m *nsq.Message, delay time.Duration, backoff bool)

OnRequeue implements nsq.MessageDelegate.

func (NoopMessageDelegate) OnTouch

func (NoopMessageDelegate) OnTouch(*nsq.Message)

OnTouch implements nsq.MessageDelegate.

type NsqWorker

type NsqWorker struct {
	NsqAddress         string // nsqd:127.0.0.1:4150,127.0.0.2:4150 -or- nsqlookupd:127.0.0.1:4161,127.0.0.2:4161
	Channel            string
	HandlerConcurrency int
	Config             *Config
	// contains filtered or unexported fields
}

func (*NsqWorker) Logger

func (w *NsqWorker) Logger() *log.Logger

func (*NsqWorker) Start

func (w *NsqWorker) Start(ctx context.Context)

func (*NsqWorker) Stop

func (w *NsqWorker) Stop(ctx context.Context) error

type NsqWorkerModule

type NsqWorkerModule struct{}

func (NsqWorkerModule) ConfigureLogger

func (NsqWorkerModule) ConfigureLogger(logflags int, w io.Writer)

ConfigureLogger implements host.HostModule

func (NsqWorkerModule) DescribeHostType

func (NsqWorkerModule) DescribeHostType() reflect.Type

DescribeHostType implements host.HostService

func (NsqWorkerModule) Init

func (NsqWorkerModule) Init(h host.Host, app *host.AppModule)

Init implements host.HostService

func (NsqWorkerModule) InitComplete

func (NsqWorkerModule) InitComplete(h host.Host, app *host.AppModule)

InitComplete implements host.HostService

type NsqWorkerRegistrar

type NsqWorkerRegistrar struct {
	// contains filtered or unexported fields
}

func NewNsqWorkerRegistrar

func NewNsqWorkerRegistrar(worker *NsqWorker) *NsqWorkerRegistrar

func (*NsqWorkerRegistrar) AddRouter

func (r *NsqWorkerRegistrar) AddRouter(topic string, handler MessageHandler, handlerComponentID string)

func (*NsqWorkerRegistrar) EnableTracer

func (r *NsqWorkerRegistrar) EnableTracer(enabled bool)

func (*NsqWorkerRegistrar) RegisterMessageHandleModule

func (r *NsqWorkerRegistrar) RegisterMessageHandleModule(module MessageHandleModule)

func (*NsqWorkerRegistrar) RegisterMessageObserver

func (r *NsqWorkerRegistrar) RegisterMessageObserver(v MessageObserver)

func (*NsqWorkerRegistrar) SetErrorHandler

func (r *NsqWorkerRegistrar) SetErrorHandler(handler ErrorHandler)

func (*NsqWorkerRegistrar) SetInvalidMessageHandler

func (r *NsqWorkerRegistrar) SetInvalidMessageHandler(handler MessageHandler)

func (*NsqWorkerRegistrar) SetMessageManager

func (r *NsqWorkerRegistrar) SetMessageManager(messageManager interface{})

type OnHostErrorHandler

type OnHostErrorHandler func(err error) (disposed bool)

type ProcessingState

type ProcessingState struct {
	Tracer *trace.SeverityTracer
	Span   *trace.SeveritySpan
	Topic  string
}

type Recover

type Recover struct {
	// contains filtered or unexported fields
}

func (*Recover) Defer

func (s *Recover) Defer(catch func(err interface{})) *Defer

type ReplyCode

type ReplyCode int
const (
	UNSET ReplyCode = iota
	PASS
	FAIL
	ABORT

	INVALID ReplyCode = -1
)

func (ReplyCode) String

func (code ReplyCode) String() string

type RestrictedForwardMessageHandler

type RestrictedForwardMessageHandler int

func (RestrictedForwardMessageHandler) ProcessMessage

func (h RestrictedForwardMessageHandler) ProcessMessage(ctx *Context, message *Message) error

ProcessMessage implements MessageHandler.

type RestrictedMessageDelegate

type RestrictedMessageDelegate int

func (RestrictedMessageDelegate) OnFinish

OnFinish implements nsq.MessageDelegate.

func (RestrictedMessageDelegate) OnRequeue

func (RestrictedMessageDelegate) OnRequeue(m *nsq.Message, delay time.Duration, backoff bool)

OnRequeue implements nsq.MessageDelegate.

func (RestrictedMessageDelegate) OnTouch

OnTouch implements nsq.MessageDelegate.

type RestrictedOperationError

type RestrictedOperationError string

func (RestrictedOperationError) Error

func (e RestrictedOperationError) Error() string

type RouteComponent

type RouteComponent struct {
	MessageHandler     MessageHandler
	HandlerComponentID string
}

type Router

type Router map[string]RouteComponent

func (Router) Add

func (r Router) Add(topic string, handler MessageHandler, handlerComponentID string)

func (Router) FindHandlerComponentID

func (r Router) FindHandlerComponentID(topic string) string

func (Router) Get

func (r Router) Get(topic string) MessageHandler

func (Router) Has

func (r Router) Has(topic string) bool

func (Router) Remove

func (r Router) Remove(topic string)

type StdMessageHandleModule

type StdMessageHandleModule struct {
	// contains filtered or unexported fields
}

func NewStdMessageHandleModule

func NewStdMessageHandleModule(dispatcher *MessageDispatcher) *StdMessageHandleModule

func (*StdMessageHandleModule) CanSetSuccessor

func (*StdMessageHandleModule) CanSetSuccessor() bool

CanSetSuccessor implements MessageHandleModule.

func (*StdMessageHandleModule) OnInitComplete

func (*StdMessageHandleModule) OnInitComplete()

OnInitComplete implements MessageHandleModule.

func (*StdMessageHandleModule) OnStart

OnStart implements MessageHandleModule.

func (*StdMessageHandleModule) OnStop

OnStop implements MessageHandleModule.

func (*StdMessageHandleModule) ProcessMessage

func (m *StdMessageHandleModule) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover) error

ProcessMessage implements MessageHandleModule.

func (*StdMessageHandleModule) SetSuccessor

func (*StdMessageHandleModule) SetSuccessor(successor MessageHandleModule)

SetSuccessor implements MessageHandleModule.

type StopError

type StopError struct {
	// contains filtered or unexported fields
}

func (*StopError) Error

func (e *StopError) Error() string

func (*StopError) Unwrap

func (e *StopError) Unwrap() error

type TracerManager

type TracerManager struct {
	TracerProvider    *trace.SeverityTracerProvider
	TextMapPropagator propagation.TextMapPropagator
	// contains filtered or unexported fields
}

func NewTraceManager

func NewTraceManager() *TracerManager

func (*TracerManager) GenerateManagedTracer

func (m *TracerManager) GenerateManagedTracer(v interface{}) *trace.SeverityTracer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL