Documentation ¶
Index ¶
- func MessageID() (messageID string)
- type Consume
- type ConsumeDelivery
- type DeadLetterSaveToSQLOption
- type DeliveryResult
- type ExchangeBind
- type ExchangeDeclare
- type ExchangeDeclarePassive
- type ExchangeDelete
- type ExchangeName
- type ExchangeUnbind
- type HandleNotifyReturn
- type Option
- type OutboxInsertOption
- type OutboxOption
- type ProxyChannel
- func (ch *ProxyChannel) Close() error
- func (channel *ProxyChannel) Consume(consume Consume) (<-chan amqp.Delivery, error)
- func (channel *ProxyChannel) DeadLetterSaveToSQL(ctx context.Context, opt DeadLetterSaveToSQLOption) (err error)
- func (channel *ProxyChannel) ExchangeDeclare(declare ExchangeDeclare) (err error)
- func (ch *ProxyChannel) IsClosed() bool
- func (channel *ProxyChannel) Publish(publish Publish) (err error)
- func (channel *ProxyChannel) QueueBind(queueBind QueueBind) (err error)
- func (channel *ProxyChannel) QueueDeclare(queueDeclare QueueDeclare) (queue amqp.Queue, err error)
- func (c *ProxyChannel) SQLOutboxInsert(ctx context.Context, db *sql.DB, tx *sql.Tx, opt OutboxInsertOption) (outbox SQLOutbox, err error)
- func (c *ProxyChannel) SQLOutboxQuery(ctx context.Context, db *sql.DB, req ViewOutboxRequest) (list []ViewOutbox, total uint64, err error)
- func (c *ProxyChannel) SQLOutboxSend(ctx context.Context, db *sql.DB, outboxIDList []uint64) (err error)
- func (c *ProxyChannel) SQLOutboxStartWork(ctx context.Context, db *sql.DB, onConsumeError func(err error)) (err error)
- type ProxyConnection
- type Publish
- type QueueBind
- type QueueDeclare
- type QueueDelete
- type QueueName
- type QueueUnbind
- type RoutingKey
- type SQLOutbox
- type ViewOutbox
- type ViewOutboxRequest
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Consume ¶
type ConsumeDelivery ¶
type DeliveryResult ¶
type DeliveryResult struct {
// contains filtered or unexported fields
}
DeliveryResult create with rab.Ack() or rab.Reject()
func Ack ¶
func Ack() DeliveryResult
func Reject ¶
func Reject(err error, requeue bool) DeliveryResult
func (DeliveryResult) Error ¶
func (result DeliveryResult) Error() string
给 DeliveryResult 增加 Error 接口是为了避出现类似 rab.Ack() 或者 rab.Reject() 前面没有 return 的错误
type ExchangeBind ¶
type ExchangeDeclare ¶
type ExchangeDeclare struct { Name ExchangeName // 交换器的名称 Kind string // 交换器类型 amqp.ExchangeDirect amqp.ExchangeFanout amqp.ExchangeTopic amqp.ExchangeHeaders Durable bool // 设置是否持久化。durable设置为true表示持久化,反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。 AutoDelete bool // 设置是否自动删除。autoDelete设置为true则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。注意不能错误地把这个参数理解为:“当与此交换器连接的客户端都断开时,RabbitMQ会自动删除本交换器 Internal bool // 设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。 NoWait bool // 当noWait为true时,无需等待服务器的确认即可进行声明。如果没有特殊的缘由和应用场景,并不建议使用这个方法。 Args map[string]interface{} // 其他一些结构化参数,比如alternate-ex-change }
type ExchangeDeclarePassive ¶
type ExchangeDeclarePassive struct {
ExchangeDeclare
}
type ExchangeDelete ¶
type ExchangeDelete struct { Name string // 交换器名称 IfUnused bool // ifUnused用来设置是否在交换器没有被使用的情况下删除。如果isUnused设置为true,则只有在此交换器没有被使用的情况下才会被删除;如果设置false,则无论如何这个交换器都要被删除。 NoWait bool // 当noWait为true时,不要等待服务器确认exchange删除已完成。删除通道失败可能会关闭通道。 }
func (ExchangeDelete) Flat ¶
func (v ExchangeDelete) Flat() (name string, ifUnused, noWait bool)
type ExchangeName ¶
type ExchangeName string
func (ExchangeName) String ¶
func (name ExchangeName) String() string
type ExchangeUnbind ¶
type ExchangeUnbind struct {
ExchangeBind
}
type HandleNotifyReturn ¶
type Option ¶
type Option struct { // OnReconnect // goclub/rabbitmq 封装后的 Connection 和 Channel 都有重连机制,当发生重连时会触发 OnReconnect // OnReconnect 为 nil 时默认执行 log.Print(message, string(debug.Stack())) OnReconnect func(reconnectID string, message string, err error) // NotifyReturn 用于订阅发送时退回的消息 需在 Publish 时配合 Mandatory 使用 HandleNotifyReturn HandleNotifyReturn Outbox OutboxOption }
type OutboxInsertOption ¶
type OutboxInsertOption struct {
Publish Publish
}
type OutboxOption ¶
type ProxyChannel ¶
func (*ProxyChannel) Consume ¶
func (channel *ProxyChannel) Consume(consume Consume) (<-chan amqp.Delivery, error)
func (*ProxyChannel) DeadLetterSaveToSQL ¶
func (channel *ProxyChannel) DeadLetterSaveToSQL(ctx context.Context, opt DeadLetterSaveToSQLOption) (err error)
DeadLetterSaveToSQL https://www.rabbitmq.com/dlx.html
func (*ProxyChannel) ExchangeDeclare ¶
func (channel *ProxyChannel) ExchangeDeclare(declare ExchangeDeclare) (err error)
func (*ProxyChannel) IsClosed ¶
func (ch *ProxyChannel) IsClosed() bool
func (*ProxyChannel) Publish ¶
func (channel *ProxyChannel) Publish(publish Publish) (err error)
Publish 自动添加 MessageId 和 Timestamp
func (*ProxyChannel) QueueBind ¶
func (channel *ProxyChannel) QueueBind(queueBind QueueBind) (err error)
func (*ProxyChannel) QueueDeclare ¶
func (channel *ProxyChannel) QueueDeclare(queueDeclare QueueDeclare) (queue amqp.Queue, err error)
func (*ProxyChannel) SQLOutboxInsert ¶
func (c *ProxyChannel) SQLOutboxInsert(ctx context.Context, db *sql.DB, tx *sql.Tx, opt OutboxInsertOption) (outbox SQLOutbox, err error)
func (*ProxyChannel) SQLOutboxQuery ¶
func (c *ProxyChannel) SQLOutboxQuery(ctx context.Context, db *sql.DB, req ViewOutboxRequest) (list []ViewOutbox, total uint64, err error)
func (*ProxyChannel) SQLOutboxSend ¶
func (*ProxyChannel) SQLOutboxStartWork ¶
type ProxyConnection ¶
type ProxyConnection struct { *amqp.Connection // contains filtered or unexported fields }
参考 https://github.com/isayme/go-amqp-reconnect/blob/master/rabbitmq/rabbitmq.go
func DialConfig ¶
func (*ProxyConnection) Channel ¶
func (conn *ProxyConnection) Channel() (channel *ProxyChannel, channelClose func() error, err error)
type Publish ¶
type Publish struct { Exchange ExchangeName // 交换机名 RoutingKey RoutingKey Mandatory bool // mandatory参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。 Msg amqp.Publishing // Deprecated: RabbitMQ 3.0版本开始去掉了对immediate参数的支持, immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。 Immediate bool }
type QueueBind ¶
type QueueBind struct { Queue QueueName // 队列名称 RoutingKey RoutingKey // 用来绑定队列和交换器的路由键 Exchange ExchangeName // 交换器名称 NoWait bool // 当noWait为true时,不要等待服务器确认queue绑定完成 Args map[string]interface{} }
type QueueDeclare ¶
type QueueDeclare struct { Name QueueName // 队列名称 Durable bool // 设置是否持久化。为true则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。 AutoDelete bool // 设置是否自动删除。为true则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。 Exclusive bool // 是否排它,排它队适用一个客户端同时发送和读取消息的应用场景 NoWait bool // 当noWait为true时,无需等待服务器的确认即可进行声明。如果没有特殊的缘由和应用场景,并不建议使用这个方法。 Args map[string]interface{} // 设置队列的其他一些参数,如x-me s s age-ttl、x-expire s、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key、x-max-priority等。 }
type QueueDelete ¶
type QueueDelete struct { Name string // 队列名称 IfUnused bool // ifUnused用来设置是否在交换器没有被使用的情况下删除。如果isUnused设置为true,则只有在此队列没有被使用的情况下才会被删除;如果设置false,则无论如何这个队列都要被删除。 IfEmpty bool // 队列为空的情况下才删除 NoWait bool // 当noWait为true时,不要等待服务器确认queue删除完成如果队列 无法删除,将引发通道异常,通道将被关闭。 }
func (QueueDelete) Flat ¶
func (v QueueDelete) Flat() (name string, ifUnused, ifEmpty, noWait bool)
type QueueUnbind ¶
type QueueUnbind struct { Queue QueueName // 队列名称 RoutingKey RoutingKey // 用来解绑队列和交换器的路由键 Exchange ExchangeName // 交换器名称 Args map[string]interface{} }
func (QueueUnbind) Flat ¶
func (v QueueUnbind) Flat() (name, key, exchange string, args map[string]interface{})
type RoutingKey ¶
type RoutingKey string
func (RoutingKey) String ¶
func (key RoutingKey) String() string
type ViewOutbox ¶
type ViewOutboxRequest ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.