Documentation ¶
Index ¶
- Constants
- type LateJoinerMessagesRequest
- type LateJoinerMessagesResponse
- type Manager
- func (m *Manager) HandleReceive(container *types.Container)
- func (m *Manager) Publish(topicName string, topicType string, expiry time.Duration, payload []byte) error
- func (m *Manager) Start()
- func (m *Manager) Stop()
- func (m *Manager) Subscribe(topicName string, topicType string, onReceive func(*Message)) error
- func (m *Manager) Unsubscribe(topicName string) error
- type Message
- type MessageIdentifier
- type MessageType
- type Publication
- type Publisher
- type Subscriber
- type Subscription
Constants ¶
View Source
const MessageExpiry = time.Millisecond * 500
View Source
const MessageTimeout = time.Millisecond * 100
TODO: revise? configurable?
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type LateJoinerMessagesRequest ¶
type LateJoinerMessagesRequest struct { // messages the late joiner already knows about KnownMessages []MessageIdentifier `json:"known_messages"` }
type LateJoinerMessagesResponse ¶
type LateJoinerMessagesResponse struct { // messages that the endpoint is holding that the late joiner doesn't have already HeldMessages []Message `json:"held_messages"` }
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func NewManager ¶
func (*Manager) HandleReceive ¶
func (*Manager) Unsubscribe ¶
type Message ¶
type Message struct { // trust the sent timestamp as it's the source of truth for the state in this message Timestamp time.Time `json:"timestamp"` // discard the message after it's this much older Expiry time.Duration `json:"expiry"` // original source EndpointID ksuid.KSUID `json:"endpoint_id"` EndpointName string `json:"endpoint_name"` // incremented by sending endpoint SequenceNumber int64 `json:"sequence_number"` // topic TopicName string `json:"topic_name"` TopicType string `json:"topic_type"` // to identify and route the payload MessageType MessageType `json:"message_type"` // the actual user payload / control message content Payload []byte `json:"payload"` }
type MessageIdentifier ¶
type MessageType ¶
type MessageType int
const ( // a normal, directly delivered message StandardMessageType MessageType = 1 // a message that came via the forwarded / late joiner path ForwardedMessageType MessageType = 2 // a later joiner sends this when a new endpoint is discovered LateJoinerMessagesRequestType MessageType = 3 // the new endpoint sends this in response LateJoinerMessagesResponseType MessageType = 4 )
not using iota as a means of being explicit
type Publication ¶
type Publication struct {
// contains filtered or unexported fields
}
func NewPublication ¶
func NewPublication( endpointID ksuid.KSUID, endpointName string, topicName string, topicType string, transportManager *transport.Manager, subscriber **Subscriber, ) *Publication
func (*Publication) Publish ¶
func (p *Publication) Publish( expiry time.Duration, payload []byte, ) error
func (*Publication) Start ¶
func (p *Publication) Start()
func (*Publication) Stop ¶
func (p *Publication) Stop()
func (*Publication) TopicType ¶
func (p *Publication) TopicType() string
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func (*Subscriber) HandleReceive ¶
func (s *Subscriber) HandleReceive(container *types.Container)
func (*Subscriber) Start ¶
func (s *Subscriber) Start()
func (*Subscriber) Stop ¶
func (s *Subscriber) Stop()
func (*Subscriber) Subscribe ¶
func (s *Subscriber) Subscribe( topicName string, topicType string, onReceive func(*Message), ) error
func (*Subscriber) Unsubscribe ¶
func (s *Subscriber) Unsubscribe( topicName string, ) error
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
func NewSubscription ¶
func (*Subscription) HandleReceive ¶
func (s *Subscription) HandleReceive(message *Message)
func (*Subscription) OnReceive ¶
func (s *Subscription) OnReceive(onReceive func(*Message))
func (*Subscription) Start ¶
func (s *Subscription) Start()
func (*Subscription) Stop ¶
func (s *Subscription) Stop()
Click to show internal directories.
Click to hide internal directories.