subscriber

package
v0.1.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2022 License: MIT Imports: 13 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultConfig = Config{
	WorkersCount:                   10,
	WorkerWaitingAssignmentTimeout: time.Second * 3,
	PackageProcessingMaxTime:       time.Second * 60,
	GracefulShutdownTimeout:        time.Second * 61,
}

Functions

func WithNoExecutorsDefinedErr

func WithNoExecutorsDefinedErr(err error) error

Types

type Config

type Config struct {
	// WorkersCount specifies a number workers that process packages
	WorkersCount uint
	// WorkerWaitingAssignmentTimeout amount of time that a worker will wait for assigning a package
	WorkerWaitingAssignmentTimeout time.Duration
	// PackageProcessingMaxTime amount of time for a package to be processed
	PackageProcessingMaxTime time.Duration
	// GracefulShutdownTimeout amount of time for graceful shutdown
	GracefulShutdownTimeout time.Duration
}

Config allows to configure subscriber workflow

type NoExecutorsDefinedErr

type NoExecutorsDefinedErr struct {
	// contains filtered or unexported fields
}

type Opt

type Opt func(o *subscriberOpts)

func WithConfig

func WithConfig(c *Config) Opt

func WithConsumeOpts added in v0.1.17

func WithConsumeOpts(opts ...transport.ConsumeOpt) Opt

type PackageProperty added in v0.1.14

type PackageProperty string
const (
	ContextTraceIDKey PackageProperty = "traceId"
)

type Processor

type Processor interface {
	Process(ctx context.Context, inPkg transport.IncomingPkg) error
}

Processor knows how to process a message received by subscriber

func NewMessageProcessor

func NewMessageProcessor(decoder message.Marshaller, msgExecCtxFactory execution.MessageExecutionCtxFactory, msgDispatcher msgDispatcher.Dispatcher, logger log.Logger) Processor

NewMessageProcessor returns default implementation of Processor

type Subscriber

type Subscriber interface {
	// Run listens queues for packages and processes them. Gracefully shuts down either on os.Signal or ctx.Done()
	Run(ctx context.Context, queues ...transport.Queue) error
}

Subscriber starts listening for queues and processes messages

func NewSubscriber

func NewSubscriber(transport transport.Transport, processor Processor, logger log.Logger, opts ...Opt) Subscriber

NewSubscriber creates default subscriber implementation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL