amqpextra

package module
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2023 License: MIT Imports: 10 Imported by: 16

README

Extra features for streadway/amqp package.

Build Status

Documentation

Dialer.

Provides:

  • Auto reconnect.
  • Context aware.
  • Configured by WithXXX options.
  • Dial multiple servers.
  • Notifies ready\unready\closed states.

Examples:

Consumer.

Provides:

  • Auto reconnect.
  • Context aware.
  • Configured by WithXXX options.
  • Consumer can process messages in parallel.
  • Consumers in-process auto-scaling and backpressure.
  • Adds message context.
  • Detects queue deletion and reconnect.
  • Notifies ready\unready\closed states.

Examples:

Publisher.

Provides:

  • Auto reconnect.
  • Context aware.
  • Configured by WithXXX options.
  • Notifies ready\unready\closed states.
  • Publish could wait till connection ready.
  • Adds message context.
  • Publish a message struct (define only what you need).
  • Supports flow control.

Examples:

Consumer middlewares

The consumer could chain middlewares for a preprocessing received message.

Here's some built-in middlewares:

  • HasCorrelationID - Nack message if has no correlation id
  • HasReplyTo - Nack message if has no reply to.
  • Logger - Context with logger.
  • Recover - Recover worker from panic, nack message.
  • Expire - Convert Message expiration to context with timeout.
  • AckNack - Return middleware.Ack to ack message.

Documentation

Overview

Package amqpextra provides Dialer for dialing in case the connection lost.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Dial

func Dial(opts ...Option) (*amqp.Connection, error)

Dial returns established connection or an error. It keeps retrying until timeout 30sec is reached.

func NewConsumer

func NewConsumer(
	connCh <-chan *Connection,
	opts ...consumer.Option,
) (*consumer.Consumer, error)
Example
package main

import (
	"context"

	"log"

	"github.com/makasim/amqpextra"
	"github.com/makasim/amqpextra/consumer"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	// you can get connCh from dialer.ConnectionCh() method
	var connCh chan *amqpextra.Connection
	h := consumer.HandlerFunc(
		func(ctx context.Context, msg amqp.Delivery) interface{} {
			// process message
			msg.Ack(false)
			return nil
		})

	// create consumer
	c, err := amqpextra.NewConsumer(
		connCh,
		consumer.WithHandler(h),
		consumer.WithQueue("a_queue"),
	)
	if err != nil {
		log.Fatal(err)
	}

	// close consumer
	c.Close()
	<-c.NotifyClosed()

}
Output:

func NewPublisher

func NewPublisher(
	connCh <-chan *Connection,
	opts ...publisher.Option,
) (*publisher.Publisher, error)
Example
package main

import (
	"log"

	"github.com/makasim/amqpextra"
	"github.com/makasim/amqpextra/publisher"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	// you can get readyCh from dialer.ConnectionCh() method
	var connCh chan *amqpextra.Connection

	// create publisher
	p, err := amqpextra.NewPublisher(connCh)
	if err != nil {
		log.Fatal(err)
	}

	// publish a message
	go p.Publish(publisher.Message{
		Key: "test_queue",
		Publishing: amqp.Publishing{
			Body: []byte(`{"foo": "fooVal"}`),
		},
	})

	// close publisher
	p.Close()
	<-p.NotifyClosed()

}
Output:

Types

type AMQPConnection added in v0.15.0

type AMQPConnection interface {
	NotifyClose(chan *amqp.Error) chan *amqp.Error
	Close() error
}

AMQPConnection is an interface for streadway's *amqp.Connection

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection provides access to streadway's *amqp.Connection as well as notification channels A notification indicates that something wrong has happened to the connection. The client should get a fresh connection from Dialer.

func (*Connection) AMQPConnection added in v0.15.0

func (c *Connection) AMQPConnection() *amqp.Connection

AMQPConnection returns streadway's *amqp.Connection

func (*Connection) NotifyLost added in v0.15.0

func (c *Connection) NotifyLost() chan struct{}

NotifyLost notifies when current connection is lost and new once should be requested

type Dialer

type Dialer struct {
	// contains filtered or unexported fields
}

Dialer is responsible for keeping the connection up. If connection is lost or closed. It tries dial a server again and again with some wait periods. Dialer keep connection up until it Dialer.Close() method called or the context is canceled.

func NewDialer

func NewDialer(opts ...Option) (*Dialer, error)

NewDialer returns Dialer or a configuration error.

func (*Dialer) Close added in v0.15.0

func (c *Dialer) Close()

Close initiate Dialer close. Subscribe Dialer.NotifyClosed() to know when it was finally closed.

func (*Dialer) Connection added in v0.15.0

func (c *Dialer) Connection(ctx context.Context) (*amqp.Connection, error)

Connection returns streadway's *amqp.Connection. The client should subscribe on Dialer.NotifyReady(), Dialer.NotifyUnready() events in order to know when the connection is lost.

func (*Dialer) ConnectionCh added in v0.15.0

func (c *Dialer) ConnectionCh() <-chan *Connection

ConnectionCh returns Connection channel. The channel should be used to get established connections. The client must subscribe on Connection.NotifyLost(). Once lost, client must stop using current connection and get new one from Connection channel. Connection channel is closed when Dialer is closed. Don't forget to check for closed connection.

Example

nolint:gosimple // the purpose of select case is to stress the connCh close case.

package main

import (
	"log"

	"time"

	"github.com/makasim/amqpextra"
)

func main() {
	dialer, err := amqpextra.NewDialer(amqpextra.WithURL("amqp://guest:guest@localhost:5672/%2f"))
	if err != nil {
		log.Fatal(err)
	}

	connCh := dialer.ConnectionCh()
	go func() {
		for {
			select {
			case conn, ok := <-connCh:
				if !ok {
					// connection is permanently closed
					return
				}

				<-conn.NotifyLost()
			}
		}
	}()

	time.Sleep(time.Second)
	dialer.Close()

}
Output:

func (*Dialer) Consumer added in v0.15.0

func (c *Dialer) Consumer(opts ...consumer.Option) (*consumer.Consumer, error)

Consumer returns a consumer that support reconnection feature.

Example
package main

import (
	"context"

	"github.com/makasim/amqpextra"
	"github.com/makasim/amqpextra/consumer"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	// open connection
	d, _ := amqpextra.NewDialer(amqpextra.WithURL("amqp://guest:guest@localhost:5672/%2f"))

	h := consumer.HandlerFunc(func(ctx context.Context, msg amqp.Delivery) interface{} {
		// process message
		msg.Ack(false)

		return nil
	})

	c, _ := d.Consumer(
		consumer.WithQueue("a_queue"),
		consumer.WithHandler(h),
	)

	// close consumer
	c.Close()

	// close dialer
	d.Close()

}
Output:

func (*Dialer) Notify added in v0.16.0

func (c *Dialer) Notify(stateCh chan State) <-chan State

Notify could be used to subscribe on Dialer ready/unready events

func (*Dialer) NotifyClosed added in v0.15.0

func (c *Dialer) NotifyClosed() <-chan struct{}

NotifyClosed could be used to subscribe on Dialer closed event. Dialer.ConnectionCh() could no longer be used after this point

func (*Dialer) Publisher added in v0.15.0

func (c *Dialer) Publisher(opts ...publisher.Option) (*publisher.Publisher, error)

Publisher returns a consumer that support reconnection feature.

Example
package main

import (
	"context"
	"time"

	"github.com/makasim/amqpextra"
	"github.com/makasim/amqpextra/publisher"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	// open connection
	d, _ := amqpextra.NewDialer(amqpextra.WithURL("amqp://guest:guest@localhost:5672/%2f"))

	// create publisher
	p, _ := d.Publisher()

	ctx, cancelFunc := context.WithTimeout(context.Background(), time.Millisecond*100)
	defer cancelFunc()

	// publish a message
	p.Publish(publisher.Message{
		Key:     "test_queue",
		Context: ctx,
		Publishing: amqp.Publishing{
			Body: []byte(`{"foo": "fooVal"}`),
		},
	})

	// close publisher
	p.Close()

	// close connection
	d.Close()

}
Output:

type Option added in v0.15.0

type Option func(c *Dialer)

Option could be used to configure Dialer

func WithAMQPDial added in v0.15.0

func WithAMQPDial(dial func(url string, c amqp.Config) (AMQPConnection, error)) Option

WithAMQPDial configure dial function. The function takes the url and amqp.Config and returns AMQPConnection.

func WithConnectionProperties added in v0.15.0

func WithConnectionProperties(props amqp.Table) Option

WithConnectionProperties configure connection properties set on dial.

func WithContext added in v0.15.0

func WithContext(ctx context.Context) Option

WithLogger configure Dialer context The context could used later to stop Dialer

func WithLogger added in v0.15.0

func WithLogger(l logger.Logger) Option

WithLogger configure the logger used by Dialer

func WithNotify added in v0.16.0

func WithNotify(stateCh chan State) Option

WithNotify helps subscribe on Dialer ready/unready events.

func WithRetryPeriod added in v0.15.0

func WithRetryPeriod(dur time.Duration) Option

WithRetryPeriod configure how much time to wait before next dial attempt. Default: 5sec.

func WithTLS added in v0.17.0

func WithTLS(tlsConfig *tls.Config) Option

WithTLS configure TLS. tlsConfig TLS configuration to use

func WithURL added in v0.15.0

func WithURL(urls ...string) Option

WithURL configure RabbitMQ servers to dial. Dialer dials url by round-robbin

type Ready added in v0.16.2

type Ready struct{}

type State added in v0.16.2

type State struct {
	Ready   *Ready
	Unready *Unready
}

type Unready added in v0.16.2

type Unready struct {
	Err error
}

Directories

Path Synopsis
mock_consumer
Package mock_consumer is a generated GoMock package.
Package mock_consumer is a generated GoMock package.
e2e_test
logrus module
Package mock_amqpextra is a generated GoMock package.
Package mock_amqpextra is a generated GoMock package.
mock_publisher
Package mock_publisher is a generated GoMock package.
Package mock_publisher is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL