rabbitmq

package module
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2024 License: MIT Imports: 5 Imported by: 2

README

RabbitMQ - a client for microservices

Overview

This library aims to simplify the creation of a rabbitMQ package on a Go service. Hopefully it will make your life easier.

Installation

To use this library you can simply import the package github.com/gbeletti/rabbitmq and run the command go mod tidy to download it or you can explicit run

go get github.com/gbeletti/rabbitmq

Usage

You can see a package example here. The main.go starts the service and the package queuerabbit uses this lib.

Connecting to rabbit

To get started import the package github.com/gbeletti/rabbitmq, call the function rabbitmq.NewRabbitMQ() and connect it to the rabbitMQ server.

import (
    "context"
    "log"
    "os"

    "github.com/gbeletti/rabbitmq"
)

var rabbit rabbitmq.RabbitMQ
rabbit = rabbitmq.NewRabbitMQ()

ctx, cancel = context.WithCancel(context.Background())

configConn := rabbitmq.ConfigConnection{
    URI:           "amqp://guest:guest@localhost:5672?heartbeat=30&connection_timeout=120",
    PrefetchCount: 1,
}

var setup rabbitmq.Setup = func() {
    // creates and consumes from queues
}

rabbitmq.KeepConnectionAndSetup(ctx, rabbit, configConn, setup)

The function KeepConnectionAndSetup will create a goroutine to keep the connection open until the context is canceled. It is important that the context is canceled on the shutdown of the service so it stops trying to keep the connection opened.

Shutting down gracefully

When the service is going down you must call the Close function to close all the connections gracefully.

ctx, cancelTimeout := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
var done chan struct{}
done = rabbit.Close(ctx)
<-done

It will stop receiving new messages and wait processing all the messages received from queue and publishing message to exchange or it will timeout after a given time.

Creating queues

Just create the configuration struct and call the CreateQueue function.

func createQueues(rabbit rabbitmq.QueueCreator) {
    config := rabbitmq.ConfigQueue{
        Name:       "test",
        Durable:    true,
        AutoDelete: false,
        Exclusive:  false,
        NoWait:     false,
        Args:       nil,
    }
    _, err := rabbit.CreateQueue(config)
    if err != nil {
        log.Printf("error creating queue: %s\n", err)
    }
}
Consuming from exchange

First create the configuration

config := rabbitmq.ConfigConsume{
    QueueName:         "test",
    Consumer:          "test",
    AutoAck:           false,
    Exclusive:         false,
    NoLocal:           false,
    NoWait:            false,
    Args:              nil,
    ExecuteConcurrent: true,
}

The option ExecuteConcurrent defines if the message received should run in a goroutine or not.

Then create the function to be executed upon getting a new message.

func receiveMessage(d *amqp.Delivery) {
    defer func() {
        if err := d.Ack(false); err != nil {
            log.Printf("error acking message: %s\n", err)
        }
    }()
    log.Printf("received message: %s\n", d.Body)
}

Every message sent to test queue will execute the receiveMessage function.

Finally run the Consume function in a goroutine

go func() {
    if err := rabbit.Consume(ctx, config, receiveMessage); err != nil {
        log.Printf("error consuming from queue: %s\n", err)
    }
}()

It is important that the context has cancel, so when it is canceled it will stop consuming messages from queue. You can share the same context used in the connection.

Reference

This library uses rabbitmq/amqp091-go. To better understand the options for the queues and exchanges I suggest their documentation.

Documentation

Overview

Package rabbitmq has all the functions to make it work. Check the README for more details

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func KeepConnectionAndSetup added in v0.0.3

func KeepConnectionAndSetup(ctx context.Context, conn Connector, config ConfigConnection, setupRabbit RabbitSetup)

KeepConnectionAndSetup starts a goroutine to keep the connection open and everytime the connection is open, it will call the setupRabbit function. It is important to pass a context with cancel so the goroutine can be closed when the context is done. Otherwise it will run until the program ends.

func NotifyOpenConnection added in v0.0.3

func NotifyOpenConnection(notify chan struct{})

NotifyOpenConnection registers a channel to be notified when the connection is open

func NotifySetupDone added in v0.0.3

func NotifySetupDone(notify chan struct{})

NotifySetupDone registers a channel to be notified when the setup is done by the KeepConnectionAndSetup function

Types

type Closer

type Closer interface {
	Close(ctx context.Context) (done chan struct{})
}

Closer is an interface for closing a RabbitMQ connection

type ConfigBindQueue

type ConfigBindQueue struct {
	QueueName  string
	Exchange   string
	RoutingKey string
	NoWait     bool
	Args       amqp.Table
}

ConfigBindQueue is the configuration for the bind to queue

type ConfigConnection

type ConfigConnection struct {
	URI           string
	PrefetchCount int
}

ConfigConnection is the configuration for the connection

type ConfigConsume

type ConfigConsume struct {
	QueueName         string
	Consumer          string
	AutoAck           bool
	Exclusive         bool
	NoLocal           bool
	NoWait            bool
	Args              amqp.Table
	ExecuteConcurrent bool
}

ConfigConsume is the configuration for the consumer

func NewConfigConsume added in v0.0.5

func NewConfigConsume(queueName, consumer string) ConfigConsume

NewConfigConsume helper function to create a new ConfigConsume with some default values

type ConfigExchange added in v0.0.5

type ConfigExchange struct {
	Name       string
	Type       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
}

ConfigExchange is the configuration for the exchange

type ConfigPublish

type ConfigPublish struct {
	Exchange        string
	RoutingKey      string
	Mandatory       bool
	Immediate       bool
	Headers         amqp.Table
	ContentType     string
	ContentEncoding string
	Priority        uint8
	CorrelationID   string
	MessageID       string
}

ConfigPublish is the configuration for the publisher

func NewConfigPublish added in v0.0.5

func NewConfigPublish(exchange, routingKey string) ConfigPublish

NewConfigPublish helper function to create a new ConfigPublish with some default values

type ConfigQueue

type ConfigQueue struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

ConfigQueue is the configuration for the queue

type Connector

type Connector interface {
	Connect(config ConfigConnection) (notify chan *amqp.Error, err error)
}

Connector is an interface for connecting to a RabbitMQ server

type Consumer

type Consumer interface {
	Consume(ctx context.Context, config ConfigConsume, f func(*amqp.Delivery)) (err error)
}

Consumer is the interface for consuming messages from a queue

type ExchangeCreator added in v0.0.5

type ExchangeCreator interface {
	CreateExchange(config ConfigExchange) (err error)
}

ExchangeCreator is the interface for creating exchanges

type Publisher

type Publisher interface {
	Publish(ctx context.Context, body []byte, config ConfigPublish) (err error)
}

Publisher is the interface for publishing messages to an exchange

type QueueBinder added in v0.0.5

type QueueBinder interface {
	BindQueueExchange(config ConfigBindQueue) (err error)
	UnbindQueueExchange(config ConfigBindQueue) (err error)
}

QueueBinder is the interface for binding and unbinding queues

type QueueCreator

type QueueCreator interface {
	CreateQueue(config ConfigQueue) (queue amqp.Queue, err error)
}

QueueCreator is the interface for creating

type RabbitMQ

RabbitMQ combines all the interfaces of the package

func NewRabbitMQ

func NewRabbitMQ() RabbitMQ

NewRabbitMQ creates the object to manage the operations to rabbitMQ

type RabbitSetup added in v0.0.3

type RabbitSetup interface {
	Setup()
}

RabbitSetup is the interface for setting up the RabbitMQ queues and exchanges after the connection is made

type Setup added in v0.0.3

type Setup func()

Setup is a type that implements the RabbitSetup interface

func (Setup) Setup added in v0.0.3

func (s Setup) Setup()

Setup executes the setup function

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL