Documentation ¶
Index ¶
- Constants
- Variables
- type BackendQueue
- type Channel
- func (c *Channel) AddClient(clientID int64, client Consumer)
- func (c *Channel) Close() error
- func (c *Channel) Delete() error
- func (c *Channel) Depth() int64
- func (c *Channel) Empty() error
- func (c *Channel) Exiting() bool
- func (c *Channel) FinishMessage(clientID int64, id MessageID) error
- func (c *Channel) IsPaused() bool
- func (c *Channel) Pause() error
- func (c *Channel) PutMessage(m *Message) error
- func (c *Channel) RemoveClient(clientID int64)
- func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error
- func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error
- func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error
- func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error
- func (c *Channel) UnPause() error
- type ChannelStats
- type Channels
- type ChannelsByName
- type ClientStats
- type Consumer
- type Message
- type MessageID
- type NSQD
- func (n *NSQD) DeleteExistingTopic(topicName string) error
- func (n *NSQD) Exit()
- func (n *NSQD) GetError() error
- func (n *NSQD) GetExistingTopic(topicName string) (*Topic, error)
- func (n *NSQD) GetHealth() string
- func (n *NSQD) GetStartTime() time.Time
- func (n *NSQD) GetStats() []TopicStats
- func (n *NSQD) GetTopic(topicName string) *Topic
- func (n *NSQD) IsAuthEnabled() bool
- func (n *NSQD) IsHealthy() bool
- func (n *NSQD) LoadMetadata()
- func (n *NSQD) Main()
- func (n *NSQD) Notify(v interface{})
- func (n *NSQD) PersistMetadata() error
- func (n *NSQD) RealHTTPAddr() *net.TCPAddr
- func (n *NSQD) RealHTTPSAddr() *net.TCPAddr
- func (n *NSQD) RealTCPAddr() *net.TCPAddr
- func (n *NSQD) SetHealth(err error)
- type Options
- type Topic
- func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile
- func (t *Topic) Close() error
- func (t *Topic) Delete() error
- func (t *Topic) DeleteExistingChannel(channelName string) error
- func (t *Topic) Depth() int64
- func (t *Topic) Empty() error
- func (t *Topic) Exiting() bool
- func (t *Topic) GetChannel(channelName string) *Channel
- func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)
- func (t *Topic) IsPaused() bool
- func (t *Topic) Pause() error
- func (t *Topic) PutMessage(m *Message) error
- func (t *Topic) PutMessages(msgs []*Message) error
- func (t *Topic) UnPause() error
- type TopicStats
- type Topics
- type TopicsByName
- type Uint64Slice
Constants ¶
const ( TLSNotRequired = iota TLSRequiredExceptHTTP TLSRequired )
const (
MsgIDLength = 16
)
Variables ¶
var ErrIDBackwards = errors.New("ID went backward")
var ErrSequenceExpired = errors.New("sequence expired")
var ErrTimeBackwards = errors.New("time has gone backwards")
Functions ¶
This section is empty.
Types ¶
type BackendQueue ¶
type BackendQueue interface { Put([]byte) error ReadChan() chan []byte // this is expected to be an *unbuffered* channel Close() error Delete() error Depth() int64 Empty() error }
BackendQueue represents the behavior for the secondary message storage system
type Channel ¶
Channel represents the concrete type for a NSQ channel (and also implements the Queue interface)
There can be multiple channels per topic, each with there own unique set of subscribers (clients).
Channels maintain all client and message metadata, orchestrating in-flight messages, timeouts, requeuing, etc.
func NewChannel ¶
func NewChannel(topicName string, channelName string, ctx *context, deleteCallback func(*Channel)) *Channel
NewChannel creates a new instance of the Channel type and returns a pointer 创建一个新的Channel
func (*Channel) Exiting ¶
Exiting returns a boolean indicating if this channel is closed/exiting 标记Channel正在退出
func (*Channel) FinishMessage ¶
FinishMessage successfully discards an in-flight message 消费者发送FIN,表明消息已经被接收并正确处理。 FinishMessage分别调用popInFlightMessage和removeFromInFlightPQ将消息从inFlightMessages和inFlightPQ中删除。最后,统计该消息的投递情况。
func (*Channel) PutMessage ¶
PutMessage writes a Message to the queue 写消息写入队列
func (*Channel) RemoveClient ¶
RemoveClient removes a client from the Channel's client list 从这个Channel删除客户端
func (*Channel) RequeueMessage ¶
RequeueMessage requeues a message based on `time.Duration`, ie:
`timeoutMs` == 0 - requeue a message immediately `timeoutMs` > 0 - asynchronously wait for the specified timeout
and requeue a message (aka "deferred requeue")
客户端发送REQ,表明消息投递失败,需要再次被投递。 Channel在RequeueMessage函数对消息投递失败进行处理。该函数将消息从inFlightMessages和inFlightPQ中删除,随后进行重新投递。 发送REQ时有一个附加参数timeout,该值为0时表示立即重新投递,大于0时表示等待timeout时间之后投递。
func (*Channel) StartDeferredTimeout ¶
如果timeout大于0,则调用StartDeferredTimeout进行延迟投递。首先计算延迟投递的时间点, 然后调用pushDeferredMessage将消息加入deferredMessage字典,最后将消息放入deferredPQ队列。 延迟投递的消息会被专门的worker扫描并在延迟投递的时间点后进行投递。需要注意的是,立即重新投递的消息不会进入deferredPQ队列。
func (*Channel) StartInFlightTimeout ¶
填充消息的消费者ID、投送时间、优先级,然后调用pushInFlightMessage函数将消息放入inFlightMessages字典中。最后调用addToInFlightPQ将消息放入inFlightPQ队列中。 至此,消息投递流程完成,接下来需要等待消费者对投送结果的反馈。消费者通过发送FIN、REQ、TOUCH来回复对消息的处理结果。
func (*Channel) TouchMessage ¶
TouchMessage resets the timeout for an in-flight message 消费者发送TOUCH,表明该消息的超时值需要被重置。 从inFlightPQ中取出消息,设置新的超时值后重新放入队列,新的超时值由当前时间、客户端通过IDENTIFY设置的超时值、配置中允许的最大超时值MaxMsgTimeout共同决定。
type ChannelStats ¶
type ChannelStats struct { ChannelName string `json:"channel_name"` Depth int64 `json:"depth"` BackendDepth int64 `json:"backend_depth"` InFlightCount int `json:"in_flight_count"` DeferredCount int `json:"deferred_count"` MessageCount uint64 `json:"message_count"` RequeueCount uint64 `json:"requeue_count"` TimeoutCount uint64 `json:"timeout_count"` Clients []ClientStats `json:"clients"` Paused bool `json:"paused"` E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` }
func NewChannelStats ¶
func NewChannelStats(c *Channel, clients []ClientStats) ChannelStats
type ChannelsByName ¶
type ChannelsByName struct {
Channels
}
func (ChannelsByName) Less ¶
func (c ChannelsByName) Less(i, j int) bool
type ClientStats ¶
type ClientStats struct { // TODO: deprecated, remove in 1.0 Name string `json:"name"` ClientID string `json:"client_id"` Hostname string `json:"hostname"` Version string `json:"version"` RemoteAddress string `json:"remote_address"` State int32 `json:"state"` ReadyCount int64 `json:"ready_count"` InFlightCount int64 `json:"in_flight_count"` MessageCount uint64 `json:"message_count"` FinishCount uint64 `json:"finish_count"` RequeueCount uint64 `json:"requeue_count"` ConnectTime int64 `json:"connect_ts"` SampleRate int32 `json:"sample_rate"` Deflate bool `json:"deflate"` Snappy bool `json:"snappy"` UserAgent string `json:"user_agent"` Authed bool `json:"authed,omitempty"` AuthIdentity string `json:"auth_identity,omitempty"` AuthIdentityURL string `json:"auth_identity_url,omitempty"` TLS bool `json:"tls"` CipherSuite string `json:"tls_cipher_suite"` TLSVersion string `json:"tls_version"` TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` }
客户端状态信息
type Consumer ¶
type Consumer interface { UnPause() Pause() Close() error TimedOutMessage() Stats() ClientStats Empty() }
Consumer接口,定义消费者行为
type Message ¶
type Message struct { ID MessageID // 16字节的MessageID Body []byte Timestamp int64 // nanosecond timestamp(创建的时间) Attempts uint16 // (uint16) 2-byte attempts // contains filtered or unexported fields }
定义Message结构
type MessageID ¶
type MessageID [MsgIDLength]byte
type NSQD ¶
func (*NSQD) DeleteExistingTopic ¶
DeleteExistingTopic removes a topic only if it exists 通过名字删除Topic对象
func (*NSQD) GetExistingTopic ¶
GetExistingTopic gets a topic only if it exists 通过名字获取Topic对象
func (*NSQD) GetTopic ¶
GetTopic performs a thread safe operation to return a pointer to a Topic object (potentially new) 获取指定的topic的实例,如果不存在就创建一个
func (*NSQD) IsAuthEnabled ¶
type Options ¶
type Options struct { // basic options ID int64 `flag:"worker-id" cfg:"id"` // 进程的唯一码(默认是主机名的哈希值%1024) Verbose bool `flag:"verbose"` // 详细的日志输出 TCPAddress string `flag:"tcp-address"` HTTPAddress string `flag:"http-address"` HTTPSAddress string `flag:"https-address"` BroadcastAddress string `flag:"broadcast-address"` // 通过 lookupd 注册的地址(默认名是 OS) NSQLookupdTCPAddresses []string `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"` // lookupd的地址 AuthHTTPAddresses []string `flag:"auth-http-address" cfg:"auth_http_addresses"` // 认证服务地址 // diskqueue options DataPath string `flag:"data-path"` // 持久化数据的路径 MemQueueSize int64 `flag:"mem-queue-size"` // Message Channel的最大缓冲 MaxBytesPerFile int64 `flag:"max-bytes-per-file"` // 每个文件最大的字节数 SyncEvery int64 `flag:"sync-every"` // 磁盘队列 fsync 的消息数 SyncTimeout time.Duration `flag:"sync-timeout"` // 每个磁盘队列 fsync 平均耗时 QueueScanInterval time.Duration // workTicker定时器时间() QueueScanRefreshInterval time.Duration // refreshTicker定时器时间(更新Channel列表,并重新分配worker) QueueScanSelectionCount int // 每次扫描最多选择的Channel数量 QueueScanWorkerPoolMax int // queueScanWorker的goroutines的最大数量 QueueScanDirtyPercent float64 // 消息投递的比例 // msg and command options MsgTimeout time.Duration `flag:"msg-timeout" arg:"1ms"` // 自动重新队列消息前需要等待的时间 MaxMsgTimeout time.Duration `flag:"max-msg-timeout"` // 消息超时的最大时间间隔 MaxMsgSize int64 `flag:"max-msg-size" deprecated:"max-message-size" cfg:"max_msg_size"` // 消息的最大长度 MaxBodySize int64 `flag:"max-body-size"` // 消息体的最大长度 MaxReqTimeout time.Duration `flag:"max-req-timeout"` // 消息重新排队的超时时间 ClientTimeout time.Duration // client overridable configuration options MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"` // 心跳超时 MaxRdyCount int64 `flag:"max-rdy-count"` // 允许客户端一次最多接收的消息数量 MaxOutputBufferSize int64 `flag:"max-output-buffer-size"` // tcp writer对象的缓存 MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"` // 在 flushing 到客户端前,最长的配置时间间隔。 // statsd integration StatsdAddress string `flag:"statsd-address"` // 统计进程的 UDP <addr>:<port> StatsdPrefix string `flag:"statsd-prefix"` // 发送给统计keys 的前缀(%s for host replacement) StatsdInterval time.Duration `flag:"statsd-interval" arg:"1s"` // 从推送到统计的时间间隔 StatsdMemStats bool `flag:"statsd-mem-stats"` // 切换发送内存和 GC 统计数据 // e2e message latency E2EProcessingLatencyWindowTime time.Duration `flag:"e2e-processing-latency-window-time"` // 算这段时间里,点对点时间延迟 E2EProcessingLatencyPercentiles []float64 `flag:"e2e-processing-latency-percentile" cfg:"e2e_processing_latency_percentiles"` // 消息处理时间的百分比(通过逗号可以多次指定,默认为 none) // TLS config TLSCert string `flag:"tls-cert"` // 证书文件路径 TLSKey string `flag:"tls-key"` // 私钥路径文件 TLSClientAuthPolicy string `flag:"tls-client-auth-policy"` // 客户端证书授权策略 ('require' or 'require-verify') TLSRootCAFile string `flag:"tls-root-ca-file"` // 私钥证书授权 PEM 路径 TLSRequired int `flag:"tls-required"` // 客户端连接需求 TLS TLSMinVersion uint16 `flag:"tls-min-version"` // ??? // compression DeflateEnabled bool `flag:"deflate"` // 运行协商压缩特性(客户端压缩) MaxDeflateLevel int `flag:"max-deflate-level"` // 最大的压缩比率等级(> values == > nsqd CPU usage) SnappyEnabled bool `flag:"snappy"` // 打开快速选项 (客户端压缩) Logger logger }
参数信息
func NewOptions ¶
func NewOptions() *Options
type Topic ¶
func (*Topic) AggregateChannelE2eProcessingLatency ¶
func (*Topic) DeleteExistingChannel ¶
DeleteExistingChannel removes a channel from the topic only if it exists 删除名为channelName的Channel
func (*Topic) Exiting ¶
Exiting returns a boolean indicating if this topic is closed/exiting 将exitFlag标记为1,说明需要正在退出
func (*Topic) GetChannel ¶
GetChannel performs a thread safe operation to return a pointer to a Channel object (potentially new) for the given Topic
func (*Topic) GetExistingChannel ¶
获取名为channelName的Channel对象
func (*Topic) PutMessage ¶
PutMessage writes a Message to the queue 发送一条消息到队列
func (*Topic) PutMessages ¶
PutMessages writes multiple Messages to the queue 发送多条消息
type TopicStats ¶
type TopicStats struct { TopicName string `json:"topic_name"` Channels []ChannelStats `json:"channels"` Depth int64 `json:"depth"` BackendDepth int64 `json:"backend_depth"` MessageCount uint64 `json:"message_count"` Paused bool `json:"paused"` E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` }
func NewTopicStats ¶
func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats
type TopicsByName ¶
type TopicsByName struct {
Topics
}
func (TopicsByName) Less ¶
func (t TopicsByName) Less(i, j int) bool
type Uint64Slice ¶
type Uint64Slice []uint64
func (Uint64Slice) Len ¶
func (s Uint64Slice) Len() int
func (Uint64Slice) Less ¶
func (s Uint64Slice) Less(i, j int) bool
func (Uint64Slice) Swap ¶
func (s Uint64Slice) Swap(i, j int)