rabbitmq

package module
v0.0.0-...-719a222 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultPrefetchCount = 0
	DefaultExchange      = Exchange{
		Name: "toolkit",
	}

	DefaultRabbitURL = "amqp://guest:[email protected]:5672"
)

Functions

This section is empty.

Types

type CallHandler

type CallHandler func(ctx context.Context, payload []byte)

type Config

type Config struct {
	DSN      string `env:"DSN" envDefault:"" comment:"DSN = complete connection string (amqp://guest:[email protected]:5672)"`
	Host     string `env:"HOST" envDefault:"127.0.0.1"  comment:"The host to connect to (required)"`
	Port     int32  `env:"PORT" envDefault:"5672" comment:"The port to bind to (default: 5672)"`
	Vhost    string `env:"VHOST" envDefault:"/" comment:"The default host connect to (default: / )"`
	Username string `env:"USERNAME" comment:"The username to connect with (not required, guest by default)"`
	Password string `env:"PASSWORD" comment:"The password to connect with(not required, guest by default) "`

	TLSVerify bool   `env:"TLS_VERIFY" comment:"Use SSL in rabbitmq connection"`
	TLSCA     string `env:"TLS_CA" comment:"TLS CA file content used in connection"`
	TLSCert   string `env:"TLS_CERT" comment:"TLS Cert file content used in connection"`
	TLSKey    string `env:"TLS_KEY"  comment:"TLS Key file content used in connection"`

	PrefetchCount  int  `env:"PREFETCH_COUNT"  comment:"Limit the number of unacknowledged messages on a channel (or connection) when consuming"`
	PrefetchGlobal bool `env:"PREFETCH_GLOBAL"  comment:"Set prefetch limit number globally"`

	DefaultExchange *Exchange
}

type Event

type Event interface {
	Message() *Message
	Name() string
	Ack() error
	Error() error
}

type Exchange

type Exchange struct {
	Name    string
	Durable bool
}

type Message

type Message struct {
	Header map[string]string
	Body   []byte
}

type Options

type Options struct {
	Name string
}

type Plugin

type Plugin interface {
	Publish(ctx context.Context, event string, payload []byte, opts *PublishOptions) error
	Subscribe(service, event string, handler CallHandler, opts *SubscribeOptions) (Subscriber, error)
	Channel() (*amqp.Channel, error)
}

func NewPlugin

func NewPlugin(runtime runtime.Runtime, opts *Options) Plugin

func NewTestPlugin

func NewTestPlugin(ctx context.Context, cfg TestConfig) (Plugin, error)

type PublishOptions

type PublishOptions struct {
	Headers map[string]interface{}
}

type SubscribeOptions

type SubscribeOptions struct {
	DurableQueue   bool
	RequeueOnError bool
	Headers        map[string]interface{}
}

type Subscriber

type Subscriber interface {
	Unsubscribe() error
}

type SubscriberHandler

type SubscriberHandler func(ctx context.Context, event string, payload []byte)

type TestConfig

type TestConfig struct {
	Config

	RunContainer   bool
	ContainerImage string
	ContainerName  string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL