eventbus

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2022 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrMsgChanFull underlying queue fails to accept new message due to full buffer.
	ErrMsgChanFull = errors.New("message channel buffer is full")

	// ErrRingBufferClosed underlying ring buffer is closed.
	ErrRingBufferClosed = errors.New("ringbuffer is closed")
)

Functions

func Consume

func Consume(elems []ring.Elem, w ring.Writer) bool

Consume an item by writing it to the specified WriteCloser. This is used in the StreamListener creation.

func CreateGossipStreamer

func CreateGossipStreamer() (*EventBus, *GossipStreamer)

CreateGossipStreamer sets up and event bus, subscribes a SimpleStreamer to the gossip topic, and sets the right preprocessors up for the gossip topic.

Types

type Broker

type Broker interface {
	Subscriber
	Publisher
}

Broker is an Publisher and an Subscriber.

type CallbackListener

type CallbackListener struct {
	// contains filtered or unexported fields
}

CallbackListener subscribes using callbacks.

func (*CallbackListener) Close

func (c *CallbackListener) Close()

Close as part of the Listener method.

func (*CallbackListener) Notify

func (c *CallbackListener) Notify(m message.Message) error

Notify the copy of a message as a parameter to a callback.

func (*CallbackListener) SetLogLevel added in v0.4.4

func (c *CallbackListener) SetLogLevel(logrus.Level)

SetLogLevel empty implementation.

type ChanListener

type ChanListener struct {
	// contains filtered or unexported fields
}

ChanListener dispatches a message using a channel.

func (*ChanListener) Close

func (c *ChanListener) Close()

Close has no effect.

func (*ChanListener) Notify

func (c *ChanListener) Notify(m message.Message) error

Notify sends a message to the internal dispatcher channel. It forwards the message if the listener is unsafe. Otherwise, it forwards a message clone.

func (*ChanListener) SetLogLevel added in v0.4.4

func (c *ChanListener) SetLogLevel(lv logrus.Level)

SetLogLevel updates log level.

type Collector

type Collector struct {
	// contains filtered or unexported fields
}

Collector is a very stupid implementation of the wire.EventCollector interface in case no function would be supplied, it would use a channel to publish the collected packets.

func NewSimpleCollector

func NewSimpleCollector(rChan chan message.Message, f func(message.Message) error) *Collector

NewSimpleCollector is a simple wrapper around a callback that redirects collected buffers into a channel.

func (*Collector) Collect

func (m *Collector) Collect(b message.Message) error

Collect redirects a buffer copy to a channel.

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus - box for listeners and callbacks.

func CreateFrameStreamer

func CreateFrameStreamer(topic topics.Topic) (*EventBus, ring.Writer)

CreateFrameStreamer sets up and event bus, subscribes a SimpleStreamer to the gossip topic, and sets the right preprocessors up for the gossip topic.

func New

func New() *EventBus

New returns new EventBus with empty listeners.

func (*EventBus) AddDefaultTopic

func (bus *EventBus) AddDefaultTopic(tpcs ...topics.Topic)

AddDefaultTopic add topics to the default multiListener.

func (*EventBus) Close added in v0.4.4

func (e *EventBus) Close()

Close closes all topic listeners.

func (*EventBus) Publish

func (bus *EventBus) Publish(topic topics.Topic, m message.Message) (errorList []error)

Publish executes callback defined for a topic. topic is explicitly set as it might be different from the message Category (i.e. in the Gossip case). Publishing is a fire and forget. If there is no listener for a topic, the messages are lost. FIXME: Publish should fail fast and return one error. Since the code is largely asynchronous, we don't expect errors and if they happen, this should be reported asap.

func (*EventBus) Subscribe

func (bus *EventBus) Subscribe(topic topics.Topic, listener Listener) uint32

Subscribe subscribes to a topic with a channel.

func (*EventBus) SubscribeDefault

func (bus *EventBus) SubscribeDefault(listener Listener) uint32

SubscribeDefault subscribes a Listener to the default multiListener. This is normally useful for implementing a sub-dispatching mechanism (i.e. bus of busses architecture).

func (*EventBus) Unsubscribe

func (bus *EventBus) Unsubscribe(topic topics.Topic, id uint32)

Unsubscribe removes all listeners defined for a topic.

type GossipStreamer

type GossipStreamer struct {
	*SimpleStreamer
}

GossipStreamer is a SimpleStreamer which removes the checksum and the topic when reading. It is supposed to be used when testing data that needs to be streamed over the network.

func NewGossipStreamer

func NewGossipStreamer() *GossipStreamer

NewGossipStreamer creates a new GossipStreamer instance.

func (*GossipStreamer) Read

func (ms *GossipStreamer) Read() ([]byte, error)

Read the stream.

func (*GossipStreamer) SeenTopics

func (ms *GossipStreamer) SeenTopics() []topics.Topic

SeenTopics returns a slice of all the topics the SimpleStreamer has found in its stream so far.

type Listener

type Listener interface {
	// Notify a listener of a new message.
	Notify(message.Message) error

	// Update Listeners log level. From verbose to silent.
	SetLogLevel(logrus.Level)

	// Close the listener.
	Close()
}

Listener publishes a byte array that subscribers of the EventBus can use.

func NewCallbackListener

func NewCallbackListener(callback func(message.Message)) Listener

NewCallbackListener creates a callback based dispatcher.

func NewChanListener

func NewChanListener(msgChan chan<- message.Message) Listener

NewChanListener creates a channel based dispatcher. Although the message is passed by value, this is not enough to enforce thread-safety when the listener tries to read/change slices or arrays carried by the message.

func NewSafeCallbackListener added in v0.4.0

func NewSafeCallbackListener(callback func(message.Message)) Listener

NewSafeCallbackListener creates a callback based dispatcher.

func NewSafeChanListener added in v0.4.0

func NewSafeChanListener(msgChan chan<- message.Message) Listener

NewSafeChanListener creates a channel based dispatcher which is thread-safe.

func NewStreamListener

func NewStreamListener(w ring.Writer) Listener

NewStreamListener creates a new StreamListener.

func NewStreamListenerWithParams added in v0.4.4

func NewStreamListenerWithParams(w ring.Writer, bufLen int, mapper func(topic topics.Topic) byte) Listener

NewStreamListenerWithParams instantiate and configure a Stream Listener.

type Multicaster

type Multicaster interface {
	AddDefaultTopic(topics.Topic)
	SubscribeDefault(Listener) uint32
}

Multicaster allows for a single Listener to listen to multiple topics.

type Publisher

type Publisher interface {
	Publish(topics.Topic, message.Message) []error
}

Publisher publishes serialized messages on a specific topic.

type RouterStreamer added in v0.4.0

type RouterStreamer struct {
	// contains filtered or unexported fields
}

RouterStreamer reroutes a gossiped message to a list of EventBus instances.

func NewRouterStreamer added in v0.4.0

func NewRouterStreamer() *RouterStreamer

NewRouterStreamer instantiate RouterStreamer with empty list.

func (*RouterStreamer) Add added in v0.4.0

func (r *RouterStreamer) Add(p *EventBus)

Add adds a dest eventBus.

func (*RouterStreamer) Close added in v0.4.0

func (r *RouterStreamer) Close() error

Close implements io.WriteCloser.

func (*RouterStreamer) Read added in v0.4.0

func (r *RouterStreamer) Read() ([]byte, error)

func (*RouterStreamer) Write added in v0.4.0

func (r *RouterStreamer) Write(p []byte) (n int, err error)

Write implements io.WriteCloser.

type SimpleStreamer

type SimpleStreamer struct {
	*bufio.Reader
	*bufio.Writer
	// contains filtered or unexported fields
}

SimpleStreamer is a test helper which can capture information that gets gossiped by the node. It can read from the gossip stream, and stores the topics that it has seen.

func NewSimpleStreamer

func NewSimpleStreamer() *SimpleStreamer

NewSimpleStreamer returns an initialized SimpleStreamer.

func (*SimpleStreamer) Close

func (ms *SimpleStreamer) Close() error

Close implements io.WriteCloser.

func (*SimpleStreamer) Read

func (ms *SimpleStreamer) Read() ([]byte, error)

func (*SimpleStreamer) Write

func (ms *SimpleStreamer) Write(data []byte, _ *message.Metadata, priority byte) (n int, err error)

Write receives the packets from the ringbuffer and writes it on the internal pipe immediately.

type StreamListener

type StreamListener struct {
	// contains filtered or unexported fields
}

StreamListener uses a ring buffer to dispatch messages. It is inherently thread-safe.

func (*StreamListener) Close

func (s *StreamListener) Close()

Close the internal ringbuffer.

func (*StreamListener) Notify

func (s *StreamListener) Notify(m message.Message) error

Notify puts a message to the Listener's ringbuffer. It uses a goroutine so to not block while the item is put in the ringbuffer.

func (*StreamListener) SetLogLevel added in v0.4.4

func (s *StreamListener) SetLogLevel(logrus.Level)

SetLogLevel empty implementation.

type StupidStreamer added in v0.3.0

type StupidStreamer struct {
	*bufio.Reader
	*bufio.Writer
}

StupidStreamer is a streamer meant for using when testing internal forwarding of binary packets through the ring buffer. It does *not* add magic, frames or other wraps on the forwarded packet.

func NewStupidStreamer added in v0.3.0

func NewStupidStreamer() *StupidStreamer

NewStupidStreamer returns an initialized SimpleStreamer.

func (*StupidStreamer) Close added in v0.3.0

func (sss *StupidStreamer) Close() error

Close implements io.WriteCloser.

func (*StupidStreamer) Read added in v0.3.0

func (sss *StupidStreamer) Read() ([]byte, error)

func (*StupidStreamer) Write added in v0.3.0

func (sss *StupidStreamer) Write(p []byte) (n int, err error)

Write receives the packets from the ringbuffer and writes it on the internal pipe immediately.

type Subscriber

type Subscriber interface {
	Subscribe(topic topics.Topic, listener Listener) uint32
	Unsubscribe(topics.Topic, uint32)
}

Subscriber subscribes a channel to Event notifications on a specific topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL