pika

package module
v2.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2023 License: MIT Imports: 11 Imported by: 1

README

pika

A small RabbitMQ utility wrapper for go services

It provides a simpler abstraction to ease setting up services that communicate with RabbitMQ. To keep simplicity it makes a few assumptions:

  • There is only need for Topic Exchanges
  • Message body contains JSON
  • A Consumer only receives a message type

If you need more features consider a mixed used of the library or simply an alternative.

Connection

To connect to RabbitMQ create a new RabbitConnector

func main() {
  conn := pika.NewConnector()
  err := conn.Connect(RABBIT_URL)
  if err != nil {
    // TODO
  }
}

This connection will be reused for each Consumer, Publisher and Notifier (1 channel each).

If you have a more complex use-case you can still reuse the connection asking it for a new channel and setting up bindings yourself.

  channel, err := conn.Channel()

Consumer

To receive messages from RabbitMQ create a Consumer of the underlying event type.

type MsgEvent struct {
  Name string `json:"name"`
  Age  int    `json:"age"`
}

type MsgConsumer struct {}

func (c MsgConsumer) Options() pika.ConsumerOptions {
  // TODO
}

func (c MsgConsumer) HandleMessage(e MsgEvent) error {
  return nil
}

Then start the consumer in you main file.

func main() {
    // ...

    err := pika.StartConsumer[MsgEvent](conn, MsgConsumer{})
    if err != nil {
        // TODO
    }
}
Consumer Options

The connector knows how to setup the connection based on the ConsumerOptions returned by the Consumer. At a minimum it needs an exchange, topic and queue.

func (c MsgConsumer) Options() pika.ConsumerOptions {
  return pika.NewConsumerOptions(
    "", // Exchange
    "", // Topic / Routing-Key
    "", // Queue name (leave empty for random name)
  )
}

To setup the queue to persist if the application crashes.

  return pika.NewConsumerOptions("", "", "").SetDurable()

This allows messages to queue-up and start consuming as soon as it starts

You can also retry messages. It will be done in memory instead of using a dead-letter exchange.

  return pika.NewConsumerOptions("", "", "").WithRetry(1, time.Second)

Publisher

A publisher is a simple abstraction to conform the event type. It only holds a channel.

publisher, err := pika.CreatePublisher[MsgEvent](conn, pika.PublisherOptions{
  Exchange: "",
  Topic: "",
})

// To use
publisher.Publish(MsgEvent{})

Notifier

Sometimes you want to perform operations on regular intervals and them publish the result.

type Notifier[T any] interface {
	Options() NotificationOptions
	Stop() bool
	Notify() (T, error)
}

PubSub

If for testing or other use-cases you don't want to connect to a rabbitMq cluster, You can a PubSub which will handle all communication in memory.

func main() {
    pubsub := pika.NewPubSub()

    publisher, _ := pika.CreatePublisher[MsgEvent](pubsub, PublisherOptions{"exchange", "topic"})
    
    pika.StartConsumer[MsgEvent](pubsub, YourConsumer{})

    // ....
}

As long as YourConsumer listens on the same exchange and topic it will receive every msg sent trough publisher.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AppendToTopic

func AppendToTopic(key, str string) string

Types

type ChannelFactory

type ChannelFactory func() (*amqp.Channel, error)

type Connector

type Connector interface {
	Connect(string) error
	Disconnect() error

	Consume(Consumer, ConsumerOptions) error
	Publish(any, PublishOptions) error

	WithContext(ctx context.Context) Connector
	WithLogger(Logger) Connector
	WithProtocol(Protocol) Connector
	WithConsumers(int) Connector
}

func NewPubSub

func NewPubSub() Connector

func NewRabbitConnector

func NewRabbitConnector() Connector

type Consumer

type Consumer interface {
	HandleMessage(context.Context, Message) error
}

Consumer represents a RabbitMQ consumer for a typed `T` message

type ConsumerOptions

type ConsumerOptions struct {
	Exchange  string
	Topic     string
	QueueName string
	// contains filtered or unexported fields
}

ConsumerOptions represents a queue binding for a Consumer

func NewConsumerOptions

func NewConsumerOptions(exchange, topic, queue string) ConsumerOptions

NewConsumerOptions creates a ConsumerOptions object with default configurations

func (ConsumerOptions) HasRetry

func (co ConsumerOptions) HasRetry() bool

func (ConsumerOptions) SetDurable

func (co ConsumerOptions) SetDurable() ConsumerOptions

SetDurable configures the queue to be persist if the consumer disconnects

func (ConsumerOptions) SetName

func (co ConsumerOptions) SetName(name string) ConsumerOptions

SetName sets the consumer name

func (ConsumerOptions) SetPrefetch added in v2.3.0

func (co ConsumerOptions) SetPrefetch(n int) ConsumerOptions

SetPrefetch configures the prefetch count for the consumer. Default is 0, which means unlimited.

func (ConsumerOptions) WithRetry

func (co ConsumerOptions) WithRetry() ConsumerOptions

WithRetry enables in memory retries of unhandled messages. It will exponationally backoff for a max of 15min

type JsonProtocol

type JsonProtocol struct{}

func (JsonProtocol) ContentType

func (p JsonProtocol) ContentType() string

func (JsonProtocol) Marshal

func (p JsonProtocol) Marshal(v any) ([]byte, error)

func (JsonProtocol) Unmarshal

func (p JsonProtocol) Unmarshal(data []byte, v any) error

type Logger

type Logger interface {
	Debug(...any)
	Info(...any)
	Warn(...any)
	Error(...any)
}

type Message added in v2.2.0

type Message struct {
	// contains filtered or unexported fields
}

func (Message) Bind added in v2.2.0

func (m Message) Bind(v any) error

type Protocol

type Protocol interface {
	ContentType() string
	Marshal(any) ([]byte, error)
	Unmarshal([]byte, any) error
}

type PublishOptions

type PublishOptions struct {
	Exchange string
	Topic    string
}

PublishOptions specifies where to publish messages

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue[T any]() *Queue[T]

func (*Queue[T]) Dequeue

func (q *Queue[T]) Dequeue() *T

func (*Queue[T]) DequeueNoWait

func (q *Queue[T]) DequeueNoWait() *T

func (*Queue[T]) Length

func (q *Queue[T]) Length() int

func (*Queue[T]) PeakAt

func (q *Queue[T]) PeakAt(i int) *T

func (*Queue[T]) Peek

func (q *Queue[T]) Peek() *T

func (*Queue[T]) Queue

func (q *Queue[T]) Queue(e *T)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL