amqp_safe

package module
v1.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2023 License: MIT Imports: 10 Imported by: 0

README

amqp-safe

Golang AMQP with reconnect, clustering and delivery guarantee.

import (
    amqp "github.com/luckytea/safe-amqp"
)

// Start connection and open channel, async
c := amqp.NewConnector(amqp.Config{
    Hosts: []string{"amqp://admin:[email protected]"},
}).Start()

// Callback on channel ready
c.OnReady(func() {
    if err := c.ExchangeDeclare("test-exchange", amqp.ExchangeDirect, true, false, false, false, nil); err != nil {
        log.Panic(err)
    }
   
    if _, err := c.QueueDeclare("test-queue", true, false, false, false, nil); err != nil {
        log.Panic(err)
    }

    if err := c.QueueBind("test-queue", "", "test-exchange", false, nil); err != nil {
        log.Panic(err)
    }

    err := c.Publish("test-exchange", "", amqp.Publishing{
        Body: []byte("hey"),
    })
    if err != nil {
        log.Panic(err)
    }

    c.Consume("test-queue", "", func(bytes []byte) amqp.Result {
        log.Println("event:", string(bytes))
        return amqp.ResultOK
    })
})

Documentation

Index

Constants

View Source
const (
	Transient  uint8 = amqp.Transient
	Persistent uint8 = amqp.Persistent
)

DeliveryMode. Transient means higher throughput but messages will not be restored on broker restart. The delivery mode of publishings is unrelated to the durability of the queues they reside on. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart.

This remains typed as uint8 to match Publishing.DeliveryMode. Other delivery modes specific to custom queue implementations are not enumerated here.

View Source
const (
	ExchangeDirect  = "direct"
	ExchangeFanout  = "fanout"
	ExchangeTopic   = "topic"
	ExchangeHeaders = "headers"
)

Variables

View Source
var (
	ErrServerNAck   = errors.New("not ack by server")
	ErrServerReturn = errors.New("returned by server")
	ErrNoConnection = errors.New("not connected")
	ErrNoChannel    = errors.New("no channel")
)

Functions

This section is empty.

Types

type Acker

type Acker struct {
	// contains filtered or unexported fields
}

func (*Acker) Ack

func (a *Acker) Ack() error

func (*Acker) Nack

func (a *Acker) Nack(requeue bool) error

type Config

type Config struct {
	DialTimeout    time.Duration
	HeartbeatEvery time.Duration
	RetryEvery     time.Duration
	Logger         Logger
	Hosts          []string
}

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

func NewConnector

func NewConnector(cfg Config) *Connector

func (*Connector) Close

func (c *Connector) Close() (err error)

func (*Connector) Consume

func (c *Connector) Consume(queue, consumer string, cb func([]byte) Result)

func (*Connector) ConsumeAckLater

func (c *Connector) ConsumeAckLater(queue, consumer string, cb func([]byte, *Acker))

TODO: remove duplicated code

func (*Connector) ExchangeBind

func (c *Connector) ExchangeBind(destination, key, source string, noWait bool, args Table) error

func (*Connector) ExchangeDeclare

func (c *Connector) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error

func (*Connector) ExchangeDeclarePassive

func (c *Connector) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error

func (*Connector) ExchangeDelete

func (c *Connector) ExchangeDelete(name string, ifUnused, noWait bool) error

func (*Connector) ExchangeUnbind

func (c *Connector) ExchangeUnbind(destination, key, source string, noWait bool, args Table) error

func (*Connector) Flow

func (c *Connector) Flow(active bool) error

func (*Connector) OnChannel

func (c *Connector) OnChannel(f func())

func (*Connector) OnChannelFail

func (c *Connector) OnChannelFail(f func() bool)

func (*Connector) OnConnect

func (c *Connector) OnConnect(f func())

func (*Connector) OnConnectionFail

func (c *Connector) OnConnectionFail(f func() bool)

func (*Connector) OnReady

func (c *Connector) OnReady(f func())

func (*Connector) Publish

func (c *Connector) Publish(exchange, key string, publishing Publishing) error

func (*Connector) PublishWithContext added in v1.2.2

func (c *Connector) PublishWithContext(ctx context.Context, exchange, key string, publishing Publishing) error

func (*Connector) Qos

func (c *Connector) Qos(prefetchCount, prefetchSize int, global bool) error

func (*Connector) QueueBind

func (c *Connector) QueueBind(name, key, exchange string, noWait bool, args Table) error

func (*Connector) QueueDeclare

func (c *Connector) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)

func (*Connector) QueueDeclarePassive

func (c *Connector) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)

func (*Connector) QueueDelete

func (c *Connector) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)

func (*Connector) QueueInspect

func (c *Connector) QueueInspect(name string) (Queue, error)

func (*Connector) QueuePurge

func (c *Connector) QueuePurge(name string, noWait bool) (int, error)

func (*Connector) QueueUnbind

func (c *Connector) QueueUnbind(name, key, exchange string, args Table) error

func (*Connector) Recover

func (c *Connector) Recover(requeue bool) error

func (*Connector) Start

func (c *Connector) Start() *Connector

type Delivery

type Delivery = amqp.Delivery

type Logger

type Logger interface {
	Println(v ...interface{})
}

type Publishing

type Publishing = amqp.Publishing

type Queue

type Queue = amqp.Queue

type Result

type Result int
const ResultError Result = 2
const ResultOK Result = 1
const ResultReject Result = 3

type Table

type Table = amqp.Table

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL