nats

package
v2.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2023 License: MIT Imports: 11 Imported by: 10

Documentation

Index

Constants

View Source
const TermSignal = time.Duration(-1)

TermSignal if this duration was returned, event will be term`ed

View Source
const WatermillUUIDHdr = "_watermill_message_uuid"

reserved header for NATSMarshaler to send UUID

Variables

This section is empty.

Functions

This section is empty.

Types

type Connection

type Connection interface {
	// QueueSubscribe subscribes to a NATS subject, equivalent to default Subscribe if queuegroup not supplied.
	QueueSubscribe(string, string, nats.MsgHandler) (*nats.Subscription, error)
	// PublishMsg sends the provided NATS message to the broker.
	PublishMsg(*nats.Msg) error
	// Drain will end all active subscription interest and attempt to wait for in-flight messages to process before closing.
	Drain() error
	// Close will close the connection
	Close()
}

type Delay

type Delay interface {
	// WaitTime return time.Duration that we need to wait.
	// retryNum is how many times WaitTime was called for
	// specific message
	WaitTime(retryNum uint64) time.Duration
}

type DurableCalculator

type DurableCalculator = func(string, string) string

type GobMarshaler

type GobMarshaler struct{}

GobMarshaler is marshaller which is using Gob to marshal Watermill messages.

func (GobMarshaler) Marshal

func (GobMarshaler) Marshal(topic string, msg *message.Message) (*nats.Msg, error)

Marshal transforms a watermill message into gob format.

func (GobMarshaler) Unmarshal

func (GobMarshaler) Unmarshal(natsMsg *nats.Msg) (*message.Message, error)

Unmarshal extracts a watermill message from a nats message.

type JSONMarshaler

type JSONMarshaler struct{}

JSONMarshaler uses encoding/json to marshal Watermill messages.

func (JSONMarshaler) Marshal

func (JSONMarshaler) Marshal(topic string, msg *message.Message) (*nats.Msg, error)

Marshal transforms a watermill message into JSON format.

func (JSONMarshaler) Unmarshal

func (JSONMarshaler) Unmarshal(natsMsg *nats.Msg) (*message.Message, error)

Unmarshal extracts a watermill message from a nats message.

type JetStreamConfig

type JetStreamConfig struct {
	// Disabled controls whether JetStream semantics should be used
	Disabled bool

	// AutoProvision indicates the application should create the configured stream if missing on the broker
	AutoProvision bool

	// ConnectOptions contains JetStream-specific options to be used when establishing context
	ConnectOptions []nats.JSOpt

	// SubscribeOptions contains options to be used when establishing subscriptions
	SubscribeOptions []nats.SubOpt

	// PublishOptions contains options to be sent on every publish operation
	PublishOptions []nats.PubOpt

	// TrackMsgId uses the Nats.MsgId option with the msg UUID to prevent duplication (needed for exactly once processing)
	TrackMsgId bool

	// AckAsync enables asynchronous acknowledgement
	AckAsync bool

	// DurablePrefix is the prefix used by to derive the durable name from the topic.
	//
	// By default the prefix will be used on its own to form the durable name.  This only allows use
	// of a single subscription per configuration.  For more flexibility provide a DurableCalculator
	// that will receive durable prefix + topic.
	//
	// Subscriptions may also specify a “durable name” which will survive client restarts.
	// Durable subscriptions cause the server to track the last acknowledged message
	// sequence number for a client and durable name. When the client restarts/resubscribes,
	// and uses the same client ID and durable name, the server will resume delivery beginning
	// with the earliest unacknowledged message for this durable subscription.
	//
	// Doing this causes the JetStream server to track
	// the last acknowledged message for that ClientID + Durable.
	DurablePrefix string

	// DurableCalculator is a custom function used to derive a durable name from a topic + durable prefix
	DurableCalculator DurableCalculator
}

JetStreamConfig contains configuration settings specific to running in JetStream mode

func (JetStreamConfig) CalculateDurableName

func (c JetStreamConfig) CalculateDurableName(topic string) string

func (JetStreamConfig) ShouldAutoProvision

func (c JetStreamConfig) ShouldAutoProvision() bool

type Marshaler

type Marshaler interface {
	// Marshal transforms a watermill message into NATS wire format.
	Marshal(topic string, msg *message.Message) (*nats.Msg, error)
}

Marshaler provides transport encoding functions

type MarshalerUnmarshaler

type MarshalerUnmarshaler interface {
	Marshaler
	Unmarshaler
}

MarshalerUnmarshaler provides both Marshaler and Unmarshaler implementations

type MaxRetryDelay

type MaxRetryDelay struct {
	StaticDelay
	// contains filtered or unexported fields
}

MaxRetryDelay delay that returns the same time.Duration up to a maximum before sending term

func NewMaxRetryDelay

func NewMaxRetryDelay(delay time.Duration, retryLimit uint64) MaxRetryDelay

func (MaxRetryDelay) WaitTime

func (s MaxRetryDelay) WaitTime(retryNum uint64) time.Duration

type NATSMarshaler

type NATSMarshaler struct{}

NATSMarshaler uses NATS header to marshal directly between watermill and NATS formats. The watermill UUID is stored at _watermill_message_uuid

func (*NATSMarshaler) Marshal

func (*NATSMarshaler) Marshal(topic string, msg *message.Message) (*nats.Msg, error)

Marshal transforms a watermill message into JSON format.

func (*NATSMarshaler) Unmarshal

func (*NATSMarshaler) Unmarshal(natsMsg *nats.Msg) (*message.Message, error)

Unmarshal extracts a watermill message from a nats message.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher provides the nats implementation for watermill publish operations

func NewPublisher

func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

NewPublisher creates a new Publisher.

func NewPublisherWithNatsConn

func NewPublisherWithNatsConn(conn *nats.Conn, config PublisherPublishConfig, logger watermill.LoggerAdapter) (*Publisher, error)

NewPublisherWithNatsConn creates a new Publisher with the provided nats connection.

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the publisher and the underlying connection

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, messages ...*message.Message) error

Publish publishes message to NATS.

Publish will not return until an ack has been received from JetStream. When one of messages delivery fails - function is interrupted.

type PublisherConfig

type PublisherConfig struct {
	// URL is the NATS URL.
	URL string

	// NatsOptions are custom options for a connection.
	NatsOptions []nats.Option

	// Marshaler is marshaler used to marshal messages between watermill and wire formats
	Marshaler Marshaler

	// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup)
	SubjectCalculator SubjectCalculator

	// JetStream holds JetStream specific settings
	JetStream JetStreamConfig
}

PublisherConfig is the configuration to create a publisher

func (PublisherConfig) GetPublisherPublishConfig

func (c PublisherConfig) GetPublisherPublishConfig() PublisherPublishConfig

GetPublisherPublishConfig gets the configuration subset needed for individual publish calls once a connection has been established

func (PublisherConfig) Validate

func (c PublisherConfig) Validate() error

Validate ensures configuration is valid before use

type PublisherPublishConfig

type PublisherPublishConfig struct {
	// Marshaler is marshaler used to marshal messages between watermill and wire formats
	Marshaler Marshaler

	// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup)
	SubjectCalculator SubjectCalculator

	// JetStream holds JetStream specific settings
	JetStream JetStreamConfig
}

PublisherPublishConfig is the configuration subset needed for an individual publish call

type StaticDelay

type StaticDelay struct {
	Delay time.Duration
}

StaticDelay delay that always return the same time.Duration

func NewStaticDelay

func NewStaticDelay(delay time.Duration) StaticDelay

func (StaticDelay) WaitTime

func (s StaticDelay) WaitTime(retryNum uint64) time.Duration

type SubjectCalculator

type SubjectCalculator func(queueGroupPrefix, topic string) *SubjectDetail

SubjectCalculator is a function used to calculate nats subject(s) for the given topic.

type SubjectDetail

type SubjectDetail struct {
	Primary    string
	Additional []string
	QueueGroup string
}

SubjectDetail contains jetstream subject detail (primary + all additional) along with durable and queue group names for a given watermill topic.

func DefaultSubjectCalculator

func DefaultSubjectCalculator(queueGroupPrefix, topic string) *SubjectDetail

func (*SubjectDetail) All

func (s *SubjectDetail) All() []string

All combines the primary and all additional subjects for use by the jetstream client on creation.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber provides the nats implementation for watermill subscribe operations

func NewSubscriber

func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

NewSubscriber creates a new Subscriber.

func NewSubscriberWithNatsConn

func NewSubscriberWithNatsConn(conn *nats.Conn, config SubscriberSubscriptionConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

NewSubscriberWithNatsConn creates a new Subscriber with the provided nats connection.

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close closes the publisher and the underlying connection. It will attempt to wait for in-flight messages to complete.

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe subscribes messages from JetStream.

func (*Subscriber) SubscribeInitialize

func (s *Subscriber) SubscribeInitialize(topic string) error

SubscribeInitialize offers a way to ensure the stream for a topic exists prior to subscribe

type SubscriberConfig

type SubscriberConfig struct {
	// URL is the URL to the broker
	URL string

	// QueueGroupPrefix is the prefix used by SubjectCalculator to derive queue group from the topic.
	//
	// All subscriptions with the same queue name (regardless of the connection they originate from)
	// will form a queue group. Each message will be delivered to only one subscriber per queue group,
	// using queuing semantics.
	//
	// For JetStream is recommended to set it with DurablePrefix.
	// For non durable queue subscribers, when the last member leaves the group,
	// that group is removed. A durable queue group (DurablePrefix) allows you to have all members leave
	// but still maintain state. When a member re-joins, it starts at the last position in that group.
	QueueGroupPrefix string

	// SubscribersCount determines how many concurrent subscribers should be started.
	SubscribersCount int

	// CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
	// When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
	CloseTimeout time.Duration

	// How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
	AckWaitTimeout time.Duration

	// SubscribeTimeout determines how long subscriber will wait for a successful subscription
	SubscribeTimeout time.Duration

	// NatsOptions are custom []nats.Option passed to the connection.
	// It is also used to provide connection parameters, for example:
	// 		nats.URL("nats://localhost:4222")
	NatsOptions []nats.Option

	// Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
	Unmarshaler Unmarshaler

	// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup)
	SubjectCalculator SubjectCalculator

	// NakDelay sets duration after which the NACKed message will be resent.
	// By default, it's NACKed without delay.
	NakDelay Delay

	// JetStream holds JetStream specific settings
	JetStream JetStreamConfig
}

SubscriberConfig is the configuration to create a subscriber

func (*SubscriberConfig) GetSubscriberSubscriptionConfig

func (c *SubscriberConfig) GetSubscriberSubscriptionConfig() SubscriberSubscriptionConfig

GetSubscriberSubscriptionConfig gets the configuration subset needed for individual subscribe calls once a connection has been established

type SubscriberSubscriptionConfig

type SubscriberSubscriptionConfig struct {
	// Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
	Unmarshaler Unmarshaler

	// SubscribersCount determines wow much concurrent subscribers should be started.
	SubscribersCount int

	// How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
	AckWaitTimeout time.Duration

	// CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
	// When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
	CloseTimeout time.Duration

	// SubscribeTimeout determines how long subscriber will wait for a successful subscription
	SubscribeTimeout time.Duration

	// SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup)
	SubjectCalculator SubjectCalculator

	// NakDelay sets duration after which the NACKed message will be resent.
	// By default, it's NACKed without delay.
	NakDelay Delay

	// JetStream holds JetStream specific settings
	JetStream JetStreamConfig

	// QueueGroupPrefix is the prefix used by SubjectCalculator to derive queue group from the topic.
	//
	// All subscriptions with the same queue name (regardless of the connection they originate from)
	// will form a queue group. Each message will be delivered to only one subscriber per queue group,
	// using queuing semantics.
	//
	// For JetStream is recommended to set it with DurablePrefix.
	// For non durable queue subscribers, when the last member leaves the group,
	// that group is removed. A durable queue group (DurablePrefix) allows you to have all members leave
	// but still maintain state. When a member re-joins, it starts at the last position in that group.
	QueueGroupPrefix string
}

SubscriberSubscriptionConfig is the configurationz

func (*SubscriberSubscriptionConfig) Validate

func (c *SubscriberSubscriptionConfig) Validate() error

Validate ensures configuration is valid before use

type Unmarshaler

type Unmarshaler interface {
	// Unmarshal produces a watermill message from NATS wire format.
	Unmarshal(*nats.Msg) (*message.Message, error)
}

Unmarshaler provides transport decoding function

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL