eventsourced

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2021 License: Apache-2.0 Imports: 17 Imported by: 2

Documentation

Overview

Package eventsourced implements the Cloudstate eventsourcing state model.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CommandID

type CommandID int64

func (CommandID) Value

func (id CommandID) Value() int64

type Context

type Context struct {
	EntityID EntityID
	// EventSourcedEntity describes the instance hold by the EntityInstance.
	EventSourcedEntity *Entity
	// Instance is an instance of the registered entity.
	Instance EntityHandler
	// contains filtered or unexported fields
}

func (*Context) Effect

func (c *Context) Effect(effect *protocol.SideEffect)

Effect adds a side effect to be emitted. An effect is something whose result has no impact on the result of the current command - if it fails, the current command still succeeds. The result of the effect is therefore ignored. Effects are only performed after the successful completion of any state actions requested by the command handler.

Effects may be declared as synchronous or asynchronous. Asynchronous commands run in a "fire and forget" fashion. The code flow of the caller (the command handler of the entity which emitted the async command) continues while the command is being asynchronously processed. Meanwhile, synchronous commands run in "blocking" mode, ie. the commands are processed in order, one at a time. The final result of the command handler, either a reply or a forward, is not sent until all synchronous commands are completed.

func (*Context) Emit

func (c *Context) Emit(event interface{})

Emit is called by a command handler.

func (*Context) Forward

func (c *Context) Forward(forward *protocol.Forward)

Forward sets a protocol.Forward to where a command is forwarded to.

An entity may, rather than sending a reply to a command, forward it to another entity. This is done by sending a forward message back to the proxy, instructing the proxy which call on which entity should be invoked, and passing the message to invoke it with.

The command won’t be forwarded until any state actions requested by the command handler have successfully completed. It is the responsibility of the forwarded action to return a reply that matches the type of the original command handler. Forwards can be chained arbitrarily long.

func (*Context) StreamCtx

func (c *Context) StreamCtx() context.Context

StreamCtx returns the context.Context for the contexts' current running stream.

type Entity

type Entity struct {
	// ServiceName is the fully qualified name of the service that implements this
	// entities interface.
	// Setting it is mandatory.
	ServiceName ServiceName
	// PersistenceID is used to namespace events in the journal, useful for
	// when you share the same database between multiple entities. It defaults to
	// the simple name for the entity type.
	// It’s good practice to select one explicitly, this means your database
	// isn’t depend on type names in your code.
	// Setting it is mandatory.
	PersistenceID string
	// SnapshotEvery controls how often snapshots are taken,
	// so that the entity doesn't need to be recovered from the whole journal
	// each time it’s loaded. If left unset, it defaults to 100.
	// Setting it to a negative number will result in snapshots never being taken.
	SnapshotEvery int64
	// EntityFunc is a factory method which generates a new Entity.
	EntityFunc func(id EntityID) EntityHandler

	PassivationStrategy protocol.EntityPassivationStrategy
}

Entity describes an event sourced entity. It is used to be registered as an event sourced entity on a CloudState instance.

func (*Entity) Options added in v0.3.0

func (e *Entity) Options(options ...Option)

type EntityHandler

type EntityHandler interface {
	// HandleCommand is the code that handles a command. It
	// may validate the command using the current state, and
	// may emit events as part of its processing. A command
	// handler must not update the state of the entity directly,
	// only indirectly by emitting events. If a command handler
	// does update the state, then when the entity is passivated
	// (removed from memory), those updates will be lost.
	HandleCommand(ctx *Context, name string, cmd proto.Message) (reply proto.Message, err error)
	// HandleEvent is the only piece of code that is allowed
	// to update the state of the entity. It receives events,
	// and, according to the event, updates the state.
	HandleEvent(ctx *Context, event interface{}) error
}

tag::entity-type[] An EntityHandler implements methods to handle commands and events.

type EntityID

type EntityID string

type Option added in v0.3.0

type Option func(s *Entity)

func WithPassivationStrategyTimeout added in v0.3.0

func WithPassivationStrategyTimeout(duration time.Duration) Option

type Server

type Server struct {
	entity.UnimplementedEventSourcedServer
	// contains filtered or unexported fields
}

Server is the implementation of the Server server API for the EventSourced service.

func NewServer

func NewServer() *Server

NewServer returns a new event sourced server.

func (*Server) Handle

func (s *Server) Handle(stream entity.EventSourced_HandleServer) error

Handle handles the stream. One stream will be established per active entity. Once established, the first message sent will be Init, which contains the entity ID, and, if the entity has previously persisted a snapshot, it will contain that snapshot. It will then send zero to many event messages, one for each event previously persisted. The entity is expected to apply these to its state in a deterministic fashion. Once all the events are sent, one to many commands are sent, with new commands being sent as new requests for the entity come in. The entity is expected to reply to each command with exactly one reply message. The entity should reply in order, and any events that the entity requests to be persisted the entity should handle itself, applying them to its own state, as if they had arrived as events when the event stream was being replayed on load.

ClientError handling is done so that any error returned, triggers the stream to be closed. If an error is a client failure, a ClientAction_Failure is sent with a command id set if provided by the error. If an error is a protocol failure or any other error, a EventSourcedStreamOut_Failure is sent. A protocol failure might provide a command id to be included. TODO: rephrase this to the new atomic failure pattern.

func (*Server) Register

func (s *Server) Register(entity *Entity) error

Register registers an Entity a an event sourced entity for CloudState.

type ServiceName

type ServiceName string

func (ServiceName) String

func (sn ServiceName) String() string

type Snapshooter

type Snapshooter interface {
	// Snapshot is a recording of the entire current state of an entity,
	// persisted periodically (eg, every 100 events), as an optimization.
	// With snapshots, when the entity is reloaded from the journal, the
	// entire journal doesn't need to be replayed, just the changes since
	// the last snapshot.
	Snapshot(ctx *Context) (snapshot interface{}, err error)
	// HandleSnapshot is used to apply snapshots provided by the Cloudstate
	// proxy.
	HandleSnapshot(ctx *Context, snapshot interface{}) error
}

tag::snapshooter[] A Snapshooter enables eventsourced snapshots to be taken and as well handling snapshots provided.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL