watermillnet

package module
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2023 License: MIT Imports: 13 Imported by: 0

README

Watermill Net Pub/Sub

This is net (tcp, udp, unix socket) Pub/Sub for the Watermill project.

All Pub/Sub implementations can be found at https://watermill.io/pubsubs/.

Watermill is a Go library for working efficiently with message streams. It is intended for building event driven applications, enabling event sourcing, RPC over messages, sagas and basically whatever else comes to your mind. You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog if that fits your use case.

Documentation: https://watermill.io/

Getting started guide: https://watermill.io/docs/getting-started/

Issues: https://github.com/ThreeDotsLabs/watermill/issues

Characteristics

  • ConsumerGroups
  • ExactlyOnceDelivery
  • GuaranteedOrder
  • Persistent

Contributing

All contributions are very much welcome. If you'd like to help with Watermill development, please see open issues and submit your pull request via GitHub.

Implement new transport

Currently implemented tcp4 transport. For implementing additional transport see Connection interface and pkg/connection/tcp4Connection as example.

Support

If you didn't find the answer to your question in the documentation, feel free to ask us directly!

Please join us on the #watermill channel on the Gophers slack: You can get an invite here.

License

MIT License

TODO

  • Transport wrapper for reconnection
  • CI pipeline
  • CI linting
  • CI tests coverage
  • More tests for subscriber and publisher

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPublisherClosed      = errors.New("publisher closed")
	ErrSubscriberClosed     = errors.New("subscriber closed")
	ErrSubscriberNotStarted = errors.New("subscriber not started")
	ErrNacked               = errors.New("remote side sent nack for message")
	ErrIOTimeout            = os.ErrDeadlineExceeded
	ErrConnectionNotSet     = errors.New("connection not set")
)

Functions

This section is empty.

Types

type Connection

type Connection interface {
	net.Conn
	// Establish connection with remote side.
	Connect(addr net.Addr) error
}

type InvalidConfigError

type InvalidConfigError struct {
	InvalidField  string
	InvalidReason string
}

func (*InvalidConfigError) Error

func (ic *InvalidConfigError) Error() string

type Listener

type Listener interface {
	// Accept waits for and returns the next connection to the listener.
	Accept() (Connection, error)
	// Close closes the listener.
	// Any blocked Accept operations will be unblocked and return errors.
	Close() error

	// Addr returns the listener's network address.
	Addr() net.Addr
}

type Marshaler

type Marshaler interface {
	MarshalMessage(msg any) ([]byte, error)
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(config PublisherConfig, waitAck bool) (*Publisher, error)

NewPublisher create new publisher. ATTENTION! Set connection immediately after creation.

func (*Publisher) Close

func (p *Publisher) Close() error

Close should flush unsent messages, if publisher is async.

func (*Publisher) Connect

func (p *Publisher) Connect(addr net.Addr) error

Connect to remote side.

func (*Publisher) GetConnection added in v0.1.3

func (p *Publisher) GetConnection() (Connection, error)

GetConnection get publisher connection.

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, messages ...*message.Message) error

Publish publishes provided messages to given topic. Publish can be synchronous or asynchronous - it depends on the implementation.

Most publishers implementations don't support atomic publishing of messages. This means that if publishing one of the messages fails, the next messages will not be published.

Publish must be thread safe.

func (*Publisher) SetConnection added in v0.1.3

func (p *Publisher) SetConnection(c Connection)

type PublisherConfig

type PublisherConfig struct {
	Marshaler   Marshaler
	Unmarshaler Unmarshaler
	Logger      watermill.LoggerAdapter
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(config SubscriberConfig) (*Subscriber, error)

NewSubscriber create new subscriber. ATTENTION! Set connection immediately after creation.

func (*Subscriber) Addr

func (s *Subscriber) Addr() string

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close closes all subscriptions with their output channels and flush offsets etc. when needed.

func (*Subscriber) Connect

func (s *Subscriber) Connect(l Listener) error

Connect establish connection. If connection was set via SetConnection, we should pass nil to l. If listener was set, subscriber will be waiting until the client reconnects when the connection is lost.

func (*Subscriber) GetConnection added in v0.1.3

func (s *Subscriber) GetConnection() (Connection, error)

GetConnection get publisher connection.

func (*Subscriber) SetConnection added in v0.1.3

func (s *Subscriber) SetConnection(c Connection)

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe returns output channel with messages from provided topic. Channel is closed, when Close() was called on the subscriber.

To receive the next message, `Ack()` must be called on the received message. If message processing failed and message should be redelivered `Nack()` should be called.

When provided ctx is cancelled, subscriber will close subscribe and close output channel. Provided ctx is set to all produced messages. When Nack or Ack is called on the message, context of the message is canceled. will wait for reconnects and will not exit the read loop when the connection is lost. Since it is impossible to understand whether the remote side will reconnect, this is a mandatory mechanism.

type SubscriberConfig

type SubscriberConfig struct {
	Marshaler   Marshaler
	Unmarshaler Unmarshaler
	Logger      watermill.LoggerAdapter
}

type Unmarshaler

type Unmarshaler interface {
	UnmarshalMessage(in []byte, out any) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL