common

package
v0.0.0-...-53e3d08 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2019 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// topic validate fields
	MaxSlashCount          = 8
	MaxTopicNameLen        = 255
	MaxPayloadLengthString = "Byte"

	// wildcard topic fields
	TopicSeparator    = "/"
	SingleWildCard    = "+"
	MultipleWildCard  = "#"
	SysCmdPrefix      = "$"
	FuncTopicPrefix   = "$function/"
	RemoteTopicPrefix = "$remote/"

	PrefixSub  = "sub."
	PrefixPub  = "pub."
	PrefixSess = "$session/"
	PrefixTmp  = "$session/tmp/"
	RuleMsgQ0  = "$rule/msgqos0"
	RuleTopic  = "$rule/topic"
)

common

Variables

View Source
var (
	// BucketNameDotSubscription stores session's subscription
	BucketNameDotSubscription = []byte(".subscription")
	// BucketNameDotRetained stores session's retained message
	BucketNameDotRetained = []byte(".retained")
	// BucketNameDotWill stores session's will message ?
	BucketNameDotWill = []byte(".will")
	// BucketNameDotAuth stores auth data
	BucketNameDotAuth = []byte(".auth")
	// BucketNameDotMapping stores topic mappings
	BucketNameDotMapping = []byte(".mapping")
)

Queue的Bucket名字不能包含'.',非Queue(特别是metadata)的Bucket命名推荐'.'起头

View Source
var (
	SubTopicSysRemoteFormat = "$SYS/remote/%s/%s"
	SubTopicSysFuncPrefix   = "$SYS/function/"
	SubTypeFunc             = "function"
	SubTypeRemote           = "remote"
)

Subscription

Functions

func ContainsWildcard

func ContainsWildcard(topic string) bool

ContainsWildcard check topic contains wildCard("#" or "+") or not

func PubTopicValidate

func PubTopicValidate(topic string) bool

PubTopicValidate validate MQTT publish topic

func SubTopicValidate

func SubTopicValidate(topic string) bool

SubTopicValidate validate MQTT subscribe topic

func TopicIsMatch

func TopicIsMatch(topic string, topicRule string) bool

TopicIsMatch check the given topicRule is matched the given topic or not

Types

type AckV2

type AckV2 interface {
	Ack()
	SID() uint64
}

AckV2 acknowledge interface

type Acknowledge

type Acknowledge struct {
	// contains filtered or unexported fields
}

Acknowledge acknowledgement

func NewAcknowledge

func NewAcknowledge() *Acknowledge

NewAcknowledge creates a new acknowledgement

func (*Acknowledge) Ack

func (ack *Acknowledge) Ack()

Ack acknowledges after message handled

func (*Acknowledge) Count

func (ack *Acknowledge) Count() int32

Count returns the ack count

func (*Acknowledge) Wait

func (ack *Acknowledge) Wait(cancel <-chan struct{}) bool

Wait waits until acknowledged or cancelled

type Flow

type Flow func(*Message)

Flow flows message

type Message

type Message struct {
	Persisted
	TargetQOS   uint32
	TargetTopic string
	Barrier     bool
	Retain      bool
	PacketID    uint32

	SequenceID uint64
	// contains filtered or unexported fields
}

Message MQTT message with client ID

func NewMessage

func NewMessage(qos uint32, topic string, payload []byte, clientID string) *Message

NewMessage creates a message

func UnmarshalMessage

func UnmarshalMessage(k, v []byte) (*Message, error)

UnmarshalMessage creates a message by persisted data

func (*Message) Ack

func (m *Message) Ack()

Ack acknowledges after message handled

func (*Message) CallbackPID

func (m *Message) CallbackPID()

CallbackPID calls the callback with packet id

func (*Message) SID

func (m *Message) SID() uint64

SID returns sequence ID

func (*Message) SetAcknowledge

func (m *Message) SetAcknowledge()

SetAcknowledge sets acknowledge

func (*Message) SetCallbackPID

func (m *Message) SetCallbackPID(pid uint32, callback func(uint32))

SetCallbackPID sets packet id and its callback

func (*Message) SetCallbackSID

func (m *Message) SetCallbackSID(callback func(uint64))

SetCallbackSID sets sequence id and its callback

type MsgAck

type MsgAck struct {
	*Message
	FST time.Time // first send time
}

MsgAck MQTT message with first send time

func (*MsgAck) WaitTimeout

func (m *MsgAck) WaitTimeout(bf *backoff.Backoff, republish Publish, cancel <-chan struct{})

WaitTimeout waits until finish

type PacketIDS

type PacketIDS struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

PacketIDS generates packet id by sequence id for message

func NewPacketIDS

func NewPacketIDS() *PacketIDS

NewPacketIDS creates a new PacketIDS

func (*PacketIDS) Ack

func (p *PacketIDS) Ack(id packet.ID) bool

Ack acknowledges by packet id

func (*PacketIDS) Get

func (p *PacketIDS) Get(sid uint64) (pid packet.ID)

Get get packet id

func (*PacketIDS) Set

func (p *PacketIDS) Set(ack AckV2) packet.ID

Set set acknowledge with a new packet id from sequence id

func (*PacketIDS) Size

func (p *PacketIDS) Size() (i int)

Size returns the size of index

type Persisted

type Persisted struct {
	QOS                  uint32   `protobuf:"varint,1,opt,name=QOS,proto3" json:"QOS,omitempty"`
	Topic                string   `protobuf:"bytes,2,opt,name=Topic,proto3" json:"Topic,omitempty"`
	Payload              []byte   `protobuf:"bytes,3,opt,name=Payload,proto3" json:"Payload,omitempty"`
	ClientID             string   `protobuf:"bytes,4,opt,name=ClientID,proto3" json:"ClientID,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*Persisted) Descriptor

func (*Persisted) Descriptor() ([]byte, []int)

func (*Persisted) GetClientID

func (m *Persisted) GetClientID() string

func (*Persisted) GetPayload

func (m *Persisted) GetPayload() []byte

func (*Persisted) GetQOS

func (m *Persisted) GetQOS() uint32

func (*Persisted) GetTopic

func (m *Persisted) GetTopic() string

func (*Persisted) ProtoMessage

func (*Persisted) ProtoMessage()

func (*Persisted) Reset

func (m *Persisted) Reset()

func (*Persisted) String

func (m *Persisted) String() string

func (*Persisted) XXX_DiscardUnknown

func (m *Persisted) XXX_DiscardUnknown()

func (*Persisted) XXX_Marshal

func (m *Persisted) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Persisted) XXX_Merge

func (dst *Persisted) XXX_Merge(src proto.Message)

func (*Persisted) XXX_Size

func (m *Persisted) XXX_Size() int

func (*Persisted) XXX_Unmarshal

func (m *Persisted) XXX_Unmarshal(b []byte) error

type Publish

type Publish func(Message)

Publish publishes message

type Subscribe

type Subscribe struct {
	// contains filtered or unexported fields
}

Subscribe MQTT subscribe

func NewSubscribe

func NewSubscribe() *Subscribe

NewSubscribe creates a subscribe

func (*Subscribe) Ack

func (s *Subscribe) Ack()

Ack acknowledge

func (*Subscribe) SID

func (s *Subscribe) SID() uint64

SID sequence id

func (*Subscribe) WaitTimeout

func (s *Subscribe) WaitTimeout(timeout time.Duration, cancel <-chan struct{}) bool

WaitTimeout waits until acknowledged, cancelled or timeout

type Transferred

type Transferred struct {
	Persisted            *Persisted `protobuf:"bytes,1,opt,name=Persisted,proto3" json:"Persisted,omitempty"`
	FunctionName         string     `protobuf:"bytes,2,opt,name=FunctionName,proto3" json:"FunctionName,omitempty"`
	FunctionInvokeID     string     `protobuf:"bytes,3,opt,name=FunctionInvokeID,proto3" json:"FunctionInvokeID,omitempty"`
	FunctionInstanceID   string     `protobuf:"bytes,4,opt,name=FunctionInstanceID,proto3" json:"FunctionInstanceID,omitempty"`
	XXX_NoUnkeyedLiteral struct{}   `json:"-"`
	XXX_unrecognized     []byte     `json:"-"`
	XXX_sizecache        int32      `json:"-"`
}

func (*Transferred) Descriptor

func (*Transferred) Descriptor() ([]byte, []int)

func (*Transferred) GetFunctionInstanceID

func (m *Transferred) GetFunctionInstanceID() string

func (*Transferred) GetFunctionInvokeID

func (m *Transferred) GetFunctionInvokeID() string

func (*Transferred) GetFunctionName

func (m *Transferred) GetFunctionName() string

func (*Transferred) GetPersisted

func (m *Transferred) GetPersisted() *Persisted

func (*Transferred) ProtoMessage

func (*Transferred) ProtoMessage()

func (*Transferred) Reset

func (m *Transferred) Reset()

func (*Transferred) String

func (m *Transferred) String() string

func (*Transferred) XXX_DiscardUnknown

func (m *Transferred) XXX_DiscardUnknown()

func (*Transferred) XXX_Marshal

func (m *Transferred) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Transferred) XXX_Merge

func (dst *Transferred) XXX_Merge(src proto.Message)

func (*Transferred) XXX_Size

func (m *Transferred) XXX_Size() int

func (*Transferred) XXX_Unmarshal

func (m *Transferred) XXX_Unmarshal(b []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL