eventsourcing

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2022 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package eventsourcing provides an implementation of saga.Persister that persists saga instances as a stream of events with optional snapshots.

Index

Constants

View Source
const DefaultSnapshotFrequency saga.Revision = 1000

DefaultSnapshotFrequency is the default number of revisions to allow between storing snapshots.

Variables

This section is empty.

Functions

This section is empty.

Types

type Persister

type Persister struct {
	MessageStore      messagestore.Store
	Snapshots         SnapshotRepository
	SnapshotFrequency saga.Revision
}

Persister is an implementation of saga.Persister that stores saga instances using event-sourcing semantics.

The saga data MUST implement saga.EventedData.

func (*Persister) BeginUnitOfWork

func (p *Persister) BeginUnitOfWork(
	ctx context.Context,
	sg saga.Saga,
	tx persistence.Tx,
	s ax.Sender,
	id saga.InstanceID,
) (saga.UnitOfWork, error)

BeginUnitOfWork starts a new unit-of-work that modifies a saga instance.

If the saga instance does not exist, it returns a UnitOfWork with an instance at revision zero.

type Recorder

type Recorder struct {
	Events []ax.Envelope
	Next   ax.Sender
}

Recorder is an implementation of ax.Sender that records published events in memory.

func (*Recorder) ExecuteCommand

func (s *Recorder) ExecuteCommand(
	ctx context.Context,
	m ax.Command,
	opts ...ax.ExecuteOption,
) (ax.Envelope, error)

ExecuteCommand sends a command message.

If ctx contains a message envelope, m is sent as a child of the message in that envelope.

func (*Recorder) PublishEvent

func (s *Recorder) PublishEvent(
	ctx context.Context,
	m ax.Event,
	opts ...ax.PublishOption,
) (ax.Envelope, error)

PublishEvent sends an event message.

If ctx contains a message envelope, m is sent as a child of the message in that envelope.

type SnapshotRepository

type SnapshotRepository interface {
	// LoadSagaSnapshot loads the latest available snapshot from the store.
	//
	// It returns an error if a snapshot of this instance is found, but belongs to
	// a different saga, as identified by pk, the saga's persistence key.
	LoadSagaSnapshot(
		ctx context.Context,
		tx persistence.Tx,
		pk string,
		id saga.InstanceID,
	) (i saga.Instance, ok bool, err error)

	// SaveSagaSnapshot saves a snapshot to the store.
	//
	// The implementation may return an error if a snapshot for this instance
	// belongs to a different saga, as identified by pk, the saga's persistence
	// key.
	SaveSagaSnapshot(
		ctx context.Context,
		tx persistence.Tx,
		pk string,
		i saga.Instance,
	) error

	// DeleteSagaSnapshots deletes any snapshots associated with a saga instance.
	//
	// The implementation may return an error if snapshots for this instance
	// belongs to a different saga, as identified by pk, the saga's persistence
	// key.
	DeleteSagaSnapshots(
		ctx context.Context,
		tx persistence.Tx,
		pk string,
		id saga.InstanceID,
	) error
}

SnapshotRepository is an interface for loading and saving snapshots of eventsourced saga data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL