Documentation ¶
Index ¶
- Constants
- func NoticesSubject(env, boundedContext string, args ...string) string
- func PublishStreamSingleton(ctx context.Context, p Publisher, r eventsource.StreamReader, ...) error
- func StreamSubject(env, boundedContext string) string
- func SubscribeNotices(ctx context.Context, nc *nats.Conn, env, boundedContext string, ...) error
- func SubscribeStream(ctx context.Context, nc *nats.Conn, cp Checkpointer, ...) (<-chan struct{}, error)
- type Checkpointer
- type Handler
- type HandlerFunc
- type MemoryCP
- type MessageHandler
- type MessageHandlerOption
- type Processor
- type Publisher
- type PublisherFunc
- type Repository
- type RepositoryFunc
- type Supervisor
- type Unmarshaler
Constants ¶
const ( // DefaultCommitInterval the minimum amount of time that must pass between offset commits DefaultCommitInterval = time.Second * 3 // DefaultPublishInterval the amount of time between checking the repository for updates DefaultPublishInterval = time.Minute )
const ( // ClusterID specifies the cluster-id of the nats streaming cluster that hosts events ClusterID = "events" // DefaultTimeout specifies how much time to wait for the eventual consistency DefaultTimeout = time.Second * 5 )
Variables ¶
This section is empty.
Functions ¶
func NoticesSubject ¶
NoticesSubject returns the subject for a specific bounded context
func PublishStreamSingleton ¶
func PublishStreamSingleton(ctx context.Context, p Publisher, r eventsource.StreamReader, cp *checkpoint.CP, env, bc string, nc *nats.Conn) error
PublishStreamSingleton is similar to PublishStream except that there may be only one running in the environment
func StreamSubject ¶
StreamSubject returns the streaming subject for a specific bounded context
func SubscribeNotices ¶
func SubscribeNotices(ctx context.Context, nc *nats.Conn, env, boundedContext string, fn func(id string)) error
SubscribeNotices listens for notices for a specific bounded context. Notices are published when the caller of a command would like a consistent read after writer. The notice provides eventually consistent services an opportunity to update the read model immediately.
func SubscribeStream ¶
func SubscribeStream(ctx context.Context, nc *nats.Conn, cp Checkpointer, env, boundedContext string, h Handler) (<-chan struct{}, error)
SubscribeStream subscribes to a nats stream for the specified bounded context
Types ¶
type Checkpointer ¶
type Checkpointer interface { // Load retrieves the specified nats streaming offset Load(ctx context.Context, key string) (uint64, error) // Save persists the specified nats streaming offset Save(ctx context.Context, key string, offset uint64) error }
Checkpointer persists and retrieves nats streaming offsets
type Handler ¶
type Handler interface { // Handle receives the inbound message Receive(offset uint64, data []byte) }
Handler provides the business end of the MessageHandler struct
type HandlerFunc ¶
HandlerFunc provides a func wrapper for Handler
func WithLogging ¶
func WithLogging(h Handler, logger interface { Info(string, ...log.Field) }) HandlerFunc
func (HandlerFunc) Receive ¶
func (fn HandlerFunc) Receive(offset uint64, data []byte)
Handle implements the Handle interface
type MemoryCP ¶
MemoryCP provides an in-memory non-thread-safe implementation of a checkpoint
type MessageHandler ¶
type MessageHandler struct {
// contains filtered or unexported fields
}
MessageHandler encapsulates a nats streaming processor that performs buffered processing
func NewMessageHandler ¶
func NewMessageHandler(ctx context.Context, p Processor, u Unmarshaler, cp Checkpointer, cpKey string, opts ...MessageHandlerOption) *MessageHandler
NewMessageHandler constructs a new MessageHandler with the arguments provided
func (*MessageHandler) Close ¶
func (m *MessageHandler) Close() error
Close releases resources associated with the MessageHandler
func (*MessageHandler) Done ¶
func (m *MessageHandler) Done() <-chan struct{}
Done returns a chan that signals when all the resources used by MessageHandler have been released
func (*MessageHandler) Receive ¶
func (m *MessageHandler) Receive(offset uint64, data []byte)
Handle the the specified stream record
type MessageHandlerOption ¶
type MessageHandlerOption func(m *MessageHandler)
MessageHandlerOption allows options to be specified for NewMessageHandler
func WithBufferSize ¶
func WithBufferSize(in int) MessageHandlerOption
WithBufferSize specifies the max number of messages to be called before callng the Processor
func WithInterval ¶
func WithInterval(d time.Duration) MessageHandlerOption
WithInterval specifies the maximum amount of time that can elapse until we try to flush any received events
type Processor ¶
type Processor func(ctx context.Context, events ...eventsource.Event) error
Processor performs operations on a stream of events, usually in conjunction with SubscribeStream
func WithSendNotices ¶
WithSendNotices publishes an event to the notices subject if the processor executes successfully
type Publisher ¶
type Publisher interface {
Publish(record eventsource.StreamRecord) error
}
Publisher publishes the record to a event bus
type PublisherFunc ¶
type PublisherFunc func(record eventsource.StreamRecord) error
PublisherFunc provides a func wrapper to Publisher
func PublishStan ¶
func PublishStan(st stan.Conn, subject string) PublisherFunc
PublishStan publishes events to the nats stream identified with the env and boundedContext
func WithLogPublish ¶
func WithLogPublish(publisher Publisher, logger interface { Info(string, ...log.Field) }) PublisherFunc
WithLogPublish logs when events are published
func WithPublishEvents ¶
func WithPublishEvents(fn Publisher, nc *nats.Conn, env, boundedContext string) PublisherFunc
WithPublishEvents publishes received events to nats
func (PublisherFunc) Publish ¶
func (fn PublisherFunc) Publish(record eventsource.StreamRecord) error
Publish Implements the Publisher interface
type Repository ¶
type Repository interface { // Apply executes the command against the current version of the aggregate and returns the updated version // of the aggregate (or the current version if no updates were made) Apply(ctx context.Context, cmd eventsource.Command) (int, error) }
Repository provides an abstraction over *eventsource.Repository over the mutator function
func WithConsistentRead ¶
func WithConsistentRead(repo Repository, nc *nats.Conn, env, boundedContext string) Repository
WithConsistentRead provides a faux consistent read. Should wrap WithNotifier to ensure that the NoticesSubject.{ID} is subscribed to prior to the command being executed.
func WithTrace ¶
func WithTrace(repo Repository) Repository
WithTrace logs published events to the tracer
type RepositoryFunc ¶
RepositoryFunc provides a func helper around Repository
func (RepositoryFunc) Apply ¶
func (fn RepositoryFunc) Apply(ctx context.Context, cmd eventsource.Command) (int, error)
Apply implements Repository's Apply method
type Supervisor ¶
type Supervisor interface { Check() Close() error Done() <-chan struct{} }
Supervisor reads events from a StreamReader and supervisor them to a handler
func PublishStream ¶
func PublishStream(ctx context.Context, h Publisher, r eventsource.StreamReader, cp Checkpointer, env, bc string) Supervisor
PublishStream reads from a stream and publishes
func WithReceiveNotifications ¶
func WithReceiveNotifications(s Supervisor, nc *nats.Conn, env, boundedContext string) Supervisor
WithReceiveNotifications listens to nats for notices on the StreamSubject and prods the supervisor
func WithTraceReceiveNotices ¶
func WithTraceReceiveNotices(s Supervisor, segment tracer.Segment) Supervisor
WithTraceReceiveNotices returns a Supervisor that ping when Check is invoked
type Unmarshaler ¶
type Unmarshaler func([]byte) (eventsource.Event, error)
Unmarshaler accepts a []byte encoded event and returns an event