rab

package module
v0.0.0-...-4ebd16b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2022 License: Apache-2.0 Imports: 14 Imported by: 0

README

goclub/rabbitmq

保姆式 golang rabbitmq

特色

  1. 接口友好: 基于 github.com/streadway/amqp 进行封装,提供 rab.ConsumeDelivery等工具 防止逻辑错误导致消费者永远在消费同一个消息
  2. 自动重连:避免连接断开导致业务不可用
  3. 事务发件箱(本地消息表):只需简单配置即可实现支持多进程并发的事务发件箱
  4. 死信队列人工干预:几行代码即可实现将死信保存到队列
  5. 实战示例: 干活满满的实战示例,绝不是只有 publish 和 consume 的 demo 代码片段.

示例

  1. 实战

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MessageID

func MessageID() (messageID string)

Types

type Consume

type Consume struct {
	Queue       QueueName // 队列名称
	ConsumerTag string    // 消费者标签,区分多个消费者
	NoLocal     bool      // 设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者;
	NoAck       bool      // 设置是否自动确认。建议设成false,即不自动确认
	Exclusive   bool      // 排它
	NoWait      bool
	Args        map[string]interface{}
}

func (Consume) Flat

func (v Consume) Flat() (queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args map[string]interface{})

type ConsumeDelivery

type ConsumeDelivery struct {
	Delivery          amqp.Delivery
	RequeueMiddleware func(d *amqp.Delivery) (requeue bool)
	Handle            func(ctx context.Context, d *amqp.Delivery) DeliveryResult
}

func (ConsumeDelivery) Do

func (h ConsumeDelivery) Do(ctx context.Context) (err error)

type DeadLetterSaveToSQLOption

type DeadLetterSaveToSQLOption struct {
	DB                *sql.DB
	QueueName         QueueName
	OnConsumerError   func(err error, d *amqp.Delivery)
	RequeueMiddleware func(d *amqp.Delivery) (requeue bool)
	Timezone          *time.Location `default:"time.FixedZone("CST", 8*3600) china"`
}

type DeliveryResult

type DeliveryResult struct {
	// contains filtered or unexported fields
}

DeliveryResult create with rab.Ack() or rab.Reject()

func Ack

func Ack() DeliveryResult

func Reject

func Reject(err error, requeue bool) DeliveryResult

func (DeliveryResult) Error

func (result DeliveryResult) Error() string

给 DeliveryResult 增加 Error 接口是为了避出现类似 rab.Ack() 或者 rab.Reject() 前面没有 return 的错误

type ExchangeBind

type ExchangeBind struct {
	DestinationExchange string
	SourceExchange      string
	RoutingKey          string
	NoWait              bool
	Args                map[string]interface{}
}

func (ExchangeBind) Flat

func (v ExchangeBind) Flat() (destination, key, source string, noWait bool, args map[string]interface{})

type ExchangeDeclare

type ExchangeDeclare struct {
	Name       ExchangeName           // 交换器的名称
	Kind       string                 // 交换器类型 amqp.ExchangeDirect amqp.ExchangeFanout amqp.ExchangeTopic amqp.ExchangeHeaders
	Durable    bool                   // 设置是否持久化。durable设置为true表示持久化,反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
	AutoDelete bool                   // 设置是否自动删除。autoDelete设置为true则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。注意不能错误地把这个参数理解为:“当与此交换器连接的客户端都断开时,RabbitMQ会自动删除本交换器
	Internal   bool                   // 设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
	NoWait     bool                   // 当noWait为true时,无需等待服务器的确认即可进行声明。如果没有特殊的缘由和应用场景,并不建议使用这个方法。
	Args       map[string]interface{} // 其他一些结构化参数,比如alternate-ex-change
}

func (ExchangeDeclare) Flat

func (v ExchangeDeclare) Flat() (name, kind string, durable, autoDelete, internal, noWait bool, args map[string]interface{})

type ExchangeDeclarePassive

type ExchangeDeclarePassive struct {
	ExchangeDeclare
}

type ExchangeDelete

type ExchangeDelete struct {
	Name     string // 交换器名称
	IfUnused bool   // ifUnused用来设置是否在交换器没有被使用的情况下删除。如果isUnused设置为true,则只有在此交换器没有被使用的情况下才会被删除;如果设置false,则无论如何这个交换器都要被删除。
	NoWait   bool   // 当noWait为true时,不要等待服务器确认exchange删除已完成。删除通道失败可能会关闭通道。
}

func (ExchangeDelete) Flat

func (v ExchangeDelete) Flat() (name string, ifUnused, noWait bool)

type ExchangeName

type ExchangeName string

func (ExchangeName) String

func (name ExchangeName) String() string

type ExchangeUnbind

type ExchangeUnbind struct {
	ExchangeBind
}

type HandleNotifyReturn

type HandleNotifyReturn struct {
	// 发生 NotifyReturn 时触发
	Return func(r *amqp.Return)
	// 当 Return panic时触发 Panic
	Panic func(panicRecover interface{})
}

type Option

type Option struct {
	// OnReconnect
	// goclub/rabbitmq 封装后的 Connection 和 Channel 都有重连机制,当发生重连时会触发 OnReconnect
	// OnReconnect 为 nil 时默认执行 log.Print(message, string(debug.Stack()))
	OnReconnect func(reconnectID string, message string, err error)
	// NotifyReturn 用于订阅发送时退回的消息 需在 Publish 时配合 Mandatory 使用
	HandleNotifyReturn HandleNotifyReturn
	Outbox             OutboxOption
}

type OutboxInsertOption

type OutboxInsertOption struct {
	Publish Publish
}

type OutboxOption

type OutboxOption struct {
	MaxPublishTimes     uint16                       `default:"10"`
	NextPublishTime     func(n uint16) time.Duration `default:"3s"`
	ConsumeLoopInterval time.Duration                `default:"1s"`
	Timezone            *time.Location               `default:"time.FixedZone("CST", 8*3600) china"`
	Logger              *log.Logger                  `default:"log.Default()"`
}

type ProxyChannel

type ProxyChannel struct {
	*amqp.Channel
	// contains filtered or unexported fields
}

func (*ProxyChannel) Close

func (ch *ProxyChannel) Close() error

利用继承代理 Close获取用户主动调用的close

func (*ProxyChannel) Consume

func (channel *ProxyChannel) Consume(consume Consume) (<-chan amqp.Delivery, error)

func (*ProxyChannel) DeadLetterSaveToSQL

func (channel *ProxyChannel) DeadLetterSaveToSQL(ctx context.Context, opt DeadLetterSaveToSQLOption) (err error)

DeadLetterSaveToSQL https://www.rabbitmq.com/dlx.html

func (*ProxyChannel) ExchangeDeclare

func (channel *ProxyChannel) ExchangeDeclare(declare ExchangeDeclare) (err error)

func (*ProxyChannel) IsClosed

func (ch *ProxyChannel) IsClosed() bool

func (*ProxyChannel) Publish

func (channel *ProxyChannel) Publish(publish Publish) (err error)

Publish 自动添加 MessageId 和 Timestamp

func (*ProxyChannel) QueueBind

func (channel *ProxyChannel) QueueBind(queueBind QueueBind) (err error)

func (*ProxyChannel) QueueDeclare

func (channel *ProxyChannel) QueueDeclare(queueDeclare QueueDeclare) (queue amqp.Queue, err error)

func (*ProxyChannel) SQLOutboxInsert

func (c *ProxyChannel) SQLOutboxInsert(ctx context.Context, db *sql.DB, tx *sql.Tx, opt OutboxInsertOption) (outbox SQLOutbox, err error)

func (*ProxyChannel) SQLOutboxQuery

func (c *ProxyChannel) SQLOutboxQuery(ctx context.Context, db *sql.DB, req ViewOutboxRequest) (list []ViewOutbox, total uint64, err error)

func (*ProxyChannel) SQLOutboxSend

func (c *ProxyChannel) SQLOutboxSend(ctx context.Context, db *sql.DB, outboxIDList []uint64) (err error)

func (*ProxyChannel) SQLOutboxStartWork

func (c *ProxyChannel) SQLOutboxStartWork(ctx context.Context, db *sql.DB, onConsumeError func(err error)) (err error)

type ProxyConnection

type ProxyConnection struct {
	*amqp.Connection
	// contains filtered or unexported fields
}

参考 https://github.com/isayme/go-amqp-reconnect/blob/master/rabbitmq/rabbitmq.go

func Dial

func Dial(url string, opt Option) (conn *ProxyConnection, err error)

func DialConfig

func DialConfig(url string, config amqp.Config, opt Option) (conn *ProxyConnection, err error)

func (*ProxyConnection) Channel

func (conn *ProxyConnection) Channel() (channel *ProxyChannel, channelClose func() error, err error)

type Publish

type Publish struct {
	Exchange   ExchangeName // 交换机名
	RoutingKey RoutingKey
	Mandatory  bool // mandatory参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。
	Msg        amqp.Publishing
	// Deprecated: RabbitMQ 3.0版本开始去掉了对immediate参数的支持, immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
	Immediate bool
}

func (Publish) Flat

func (v Publish) Flat() (exchange, key string, mandatory, immediate bool, msg amqp.Publishing)

type QueueBind

type QueueBind struct {
	Queue      QueueName    // 队列名称
	RoutingKey RoutingKey   // 用来绑定队列和交换器的路由键
	Exchange   ExchangeName // 交换器名称
	NoWait     bool         // 当noWait为true时,不要等待服务器确认queue绑定完成
	Args       map[string]interface{}
}

func (QueueBind) Flat

func (v QueueBind) Flat() (name, key, exchange string, noWait bool, args map[string]interface{})

type QueueDeclare

type QueueDeclare struct {
	Name       QueueName              //  队列名称
	Durable    bool                   // 设置是否持久化。为true则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
	AutoDelete bool                   // 设置是否自动删除。为true则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
	Exclusive  bool                   // 是否排它,排它队适用一个客户端同时发送和读取消息的应用场景
	NoWait     bool                   // 当noWait为true时,无需等待服务器的确认即可进行声明。如果没有特殊的缘由和应用场景,并不建议使用这个方法。
	Args       map[string]interface{} // 设置队列的其他一些参数,如x-me s s age-ttl、x-expire s、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key、x-max-priority等。

}

func (QueueDeclare) Flat

func (v QueueDeclare) Flat() (name string, durable, autoDelete, exclusive, noWait bool, args map[string]interface{})

type QueueDelete

type QueueDelete struct {
	Name     string // 队列名称
	IfUnused bool   // ifUnused用来设置是否在交换器没有被使用的情况下删除。如果isUnused设置为true,则只有在此队列没有被使用的情况下才会被删除;如果设置false,则无论如何这个队列都要被删除。
	IfEmpty  bool   // 队列为空的情况下才删除
	NoWait   bool   // 当noWait为true时,不要等待服务器确认queue删除完成如果队列 无法删除,将引发通道异常,通道将被关闭。
}

func (QueueDelete) Flat

func (v QueueDelete) Flat() (name string, ifUnused, ifEmpty, noWait bool)

type QueueName

type QueueName string

func (QueueName) String

func (name QueueName) String() string

type QueueUnbind

type QueueUnbind struct {
	Queue      QueueName    // 队列名称
	RoutingKey RoutingKey   // 用来解绑队列和交换器的路由键
	Exchange   ExchangeName // 交换器名称
	Args       map[string]interface{}
}

func (QueueUnbind) Flat

func (v QueueUnbind) Flat() (name, key, exchange string, args map[string]interface{})

type RoutingKey

type RoutingKey string

func (RoutingKey) String

func (key RoutingKey) String() string

type SQLOutbox

type SQLOutbox struct {
	OutboxID int64
	// contains filtered or unexported fields
}

func (SQLOutbox) Delete

func (l SQLOutbox) Delete(ctx context.Context) (err error)

type ViewOutbox

type ViewOutbox struct {
	ID          uint64    `json:"id"`
	Exchange    string    `json:"exchange"`
	RoutingKey  string    `json:"routingKey"`
	MessageID   string    `json:"messageID"`
	PublishJson string    `json:"publishJson"`
	Status      uint8     `json:"status"`
	CreateTime  time.Time `json:"createTime"`
}

type ViewOutboxRequest

type ViewOutboxRequest struct {
	Exchange    string `json:"exchange"`
	RoutingKey  string `json:"routingKey"`
	MessageID   string `json:"messageID"`
	Status      uint8  `json:"status"`
	OrderByDesc bool   `json:"orderByDesc"`
	Page        uint64 `json:"page" default:"1"`
	PerPage     uint64 `json:"perPage" default:"10"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL