consumer

package
v2.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoRetry = errors.New("no retry")

Functions

This section is empty.

Types

type Config

type Config[T any] struct {
	// contains filtered or unexported fields
}

type Consumer

type Consumer[T Message] struct {
	// contains filtered or unexported fields
}

func New

func New[T Message](h Handler[T], queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error)

func NewFunc

func NewFunc[T Message](h HandlerFunc[T], queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error)

func NewRaw

func NewRaw[T Message](handler RawHandler, queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error)

func NewRawFunc

func NewRawFunc[T Message](h RawHandlerFunc, queueDeclare QueueDeclare, options ...Option[T]) (Consumer[T], error)

func (Consumer[T]) Close

func (c Consumer[T]) Close() error

type ExchangeBinding

type ExchangeBinding struct {
	ExchangeName string
	RoutingKey   string
}

type Handler

type Handler[T Message] interface {
	Handle(context.Context, T) error
}

type HandlerFunc

type HandlerFunc[T Message] func(context.Context, T) error

func (HandlerFunc[T]) Handle

func (h HandlerFunc[T]) Handle(ctx context.Context, body T) error

type Message

type Message interface{}

type Option

type Option[T any] func(*Config[T])

func WithConnectionOptions

func WithConnectionOptions[T any](connectionOptions connection.Config) Option[T]

func WithContext

func WithContext[T any](ctx context.Context) Option[T]

func WithLogger

func WithLogger[T any](logger logging.Logger) Option[T]

func WithMessageDeserializer

func WithMessageDeserializer[T any](serializer serializer.Serializer[T]) Option[T]

func WithOnErrorFunc

func WithOnErrorFunc[T any](onError connection.OnErrorFunc) Option[T]

func WithOnListenerExit

func WithOnListenerExit[T any](onListenerExit func(context.Context, int)) Option[T]

func WithOnListenerStart

func WithOnListenerStart[T any](onListenerStart func(context.Context, int)) Option[T]

func WithOnMessageError

func WithOnMessageError[T any](onMessageError func(context.Context, *amqp091.Delivery, error)) Option[T]

func WithQueueConfig

func WithQueueConfig[T any](cfg QueueConfig) Option[T]

func WithRetryMessageCountCount

func WithRetryMessageCountCount[T any](count uint32) Option[T]

type QueueConfig

type QueueConfig struct {
	Workers       int
	PrefetchCount int
}

type QueueDeclare

type QueueDeclare struct {
	QueueName        string
	ExchangeBindings []ExchangeBinding
	Durable          bool
	AutoDelete       bool
	Exclusive        bool
	NoWait           bool
}

type RawHandler

type RawHandler interface {
	Handle(context.Context, *amqp091.Delivery) error
}

type RawHandlerFunc

type RawHandlerFunc func(context.Context, *amqp091.Delivery) error

func (RawHandlerFunc) Handle

func (h RawHandlerFunc) Handle(ctx context.Context, body *amqp091.Delivery) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL