Versions in this module Expand all Collapse all v0 v0.2.0 Sep 7, 2020 Changes in this version + const MQTTEventStream + const MaxEventSize + var ClientStatusChangeEvent_Status_name = map[int32]string + var ClientStatusChangeEvent_Status_value = map[string]int32 + var Event_Type_name = map[int32]string + var Event_Type_value = map[string]int32 + var File_streamIO_mqtt_broker_event_proto protoreflect.FileDescriptor + func PPrintln(obj interface{}) + type Broker struct + func New(options Options) *Broker + func (broker *Broker) Start() error + type ClientStatusChangeEvent struct + SessionID int64 + Status ClientStatusChangeEvent_Status + func (*ClientStatusChangeEvent) Descriptor() ([]byte, []int) + func (*ClientStatusChangeEvent) ProtoMessage() + func (x *ClientStatusChangeEvent) GetSessionID() int64 + func (x *ClientStatusChangeEvent) GetStatus() ClientStatusChangeEvent_Status + func (x *ClientStatusChangeEvent) ProtoReflect() protoreflect.Message + func (x *ClientStatusChangeEvent) Reset() + func (x *ClientStatusChangeEvent) String() string + type ClientStatusChangeEvent_Status int32 + const ClientStatusChangeEvent_Offline + const ClientStatusChangeEvent_Online + func (ClientStatusChangeEvent_Status) Descriptor() protoreflect.EnumDescriptor + func (ClientStatusChangeEvent_Status) EnumDescriptor() ([]byte, []int) + func (ClientStatusChangeEvent_Status) Type() protoreflect.EnumType + func (x ClientStatusChangeEvent_Status) Enum() *ClientStatusChangeEvent_Status + func (x ClientStatusChangeEvent_Status) Number() protoreflect.EnumNumber + func (x ClientStatusChangeEvent_Status) String() string + type Event struct + Data []byte + Type Event_Type + func (*Event) Descriptor() ([]byte, []int) + func (*Event) ProtoMessage() + func (x *Event) GetData() []byte + func (x *Event) GetType() Event_Type + func (x *Event) ProtoReflect() protoreflect.Message + func (x *Event) Reset() + func (x *Event) String() string + type EventReader struct + func (eReader *EventReader) Close() error + type EventWithOffset struct + type Event_Type int32 + const Event_ClientStatusChangeEvent + const Event_RetainMessageEvent + const Event_SubscribeEvent + const Event_UnSubscribeEvent + func (Event_Type) Descriptor() protoreflect.EnumDescriptor + func (Event_Type) EnumDescriptor() ([]byte, []int) + func (Event_Type) Type() protoreflect.EnumType + func (x Event_Type) Enum() *Event_Type + func (x Event_Type) Number() protoreflect.EnumNumber + func (x Event_Type) String() string + type Node struct + CoW *copyOnWrite + type Offset interface + type Options struct + BindPort int + BindTLSPort int + BrokerId int64 + CheckpointEventSize int64 + DefaultKeepalive uint16 + HOST string + LogFile string + LogLevel logrus.Level + MetaServerAddr string + MinKeepalive uint16 + ReadOffsetCommitInterval time.Duration + SnapshotPath string + func DefaultOptions() Options + func (options Options) WithBindPort(val int) Options + func (options Options) WithBindTLSPort(val int) Options + func (options Options) WithBrokerId(val int64) Options + func (options Options) WithCheckpointEventSize(val int64) Options + func (options Options) WithDefaultKeepalive(val uint16) Options + func (options Options) WithHOST(val string) Options + func (options Options) WithLogFile(val string) Options + func (options Options) WithLogLevel(val logrus.Level) Options + func (options Options) WithMetaServerAddr(val string) Options + func (options Options) WithSnapshotPath(val string) Options + type RetainMessageEvent struct + Data []byte + func (*RetainMessageEvent) Descriptor() ([]byte, []int) + func (*RetainMessageEvent) ProtoMessage() + func (x *RetainMessageEvent) GetData() []byte + func (x *RetainMessageEvent) ProtoReflect() protoreflect.Message + func (x *RetainMessageEvent) Reset() + func (x *RetainMessageEvent) String() string + type Snapshot struct + func NewSnapshot(path string) *Snapshot + func (s *Snapshot) WriteSnapshot(header SnapshotHeader, topicTree *TopicTree, metaTree *btree.BTree) error + type SnapshotHeader struct + Offset int64 + TS time.Time + type SubscribeEvent struct + Qos0StreamInfo *store.StreamInfoItem + Qos1StreamInfo *store.StreamInfoItem + SessionId int64 + Topic map[string]int32 + func (*SubscribeEvent) Descriptor() ([]byte, []int) + func (*SubscribeEvent) ProtoMessage() + func (x *SubscribeEvent) GetQos0StreamInfo() *store.StreamInfoItem + func (x *SubscribeEvent) GetQos1StreamInfo() *store.StreamInfoItem + func (x *SubscribeEvent) GetSessionId() int64 + func (x *SubscribeEvent) GetTopic() map[string]int32 + func (x *SubscribeEvent) ProtoReflect() protoreflect.Message + func (x *SubscribeEvent) Reset() + func (x *SubscribeEvent) String() string + type Subscriber interface + ID func() int64 + Online func() bool + Qos func() int32 + Topic func() string + type TopicTree struct + func NewTopicTree() *TopicTree + func (tree *TopicTree) Clone() *TopicTree + func (tree *TopicTree) Delete(subscriber Subscriber) + func (tree *TopicTree) Insert(sub Subscriber) + func (tree *TopicTree) Match(topic string) []map[int64]Subscriber + func (tree *TopicTree) RangeRetainMessage(f func(packet *packets.PublishPacket) bool) + func (tree *TopicTree) UpdateRetainPacket(packet *packets.PublishPacket) + func (tree *TopicTree) Walk(f func(path string, subscribers map[int64]Subscriber) bool) + type UnSubscribeEvent struct + SessionId int64 + Topic []string + func (*UnSubscribeEvent) Descriptor() ([]byte, []int) + func (*UnSubscribeEvent) ProtoMessage() + func (x *UnSubscribeEvent) GetSessionId() int64 + func (x *UnSubscribeEvent) GetTopic() []string + func (x *UnSubscribeEvent) ProtoReflect() protoreflect.Message + func (x *UnSubscribeEvent) Reset() + func (x *UnSubscribeEvent) String() string