sourcing

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2024 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrApplyFailed = fmt.Errorf("failed to apply event")

ErrApplyFailed is the error returned when an error occurs while applying an event to an aggregate it is joined with the underlying error

View Source
var ErrApplyingEvent = errors.New("error applying event")

ErrApplyingEvent is the error returned when an error occurs while applying an event it is joined with the underlying error

View Source
var ErrEventVersionNE = errors.New("event version is not equal to the aggregate version")

ErrEventVersionNE is the error returned when an event version is not equal to the aggregate version called from SafeApply

View Source
var ErrFailedToExportState = errors.New("failed to export state")

ErrFailedToExportState is the error returned when an error occurs while exporting the state it is joined with the underlying error

View Source
var ErrFailedToLoadSnapshot = errors.New("failed to load snapshot")

ErrFailedToLoadSnapshot is the error returned when an error occurs while loading a snapshot it is joined with the underlying error

View Source
var ErrLoadingEvents = errors.New("error loading events")

ErrLoadingEvents is the error returned when an error occurs while loading events it is joined with the underlying error

View Source
var ErrNoEvents = errors.New("no events found")

ErrNoEvents is the error returned when no events are found

View Source
var ErrNoQueueDefined = errors.New("no queue defined")

ErrNoQueueDefined is the error returned when no queue is defined This occurs when trying to store events but you don't have a queue attached

View Source
var ErrReplacingVersion = errors.New("error replacing version")

ErrReplacingVersion is the error returned when an error occurs while replacing the version of an aggregate it is joined with the underlying error

View Source
var ErrSendingEvent = errors.New("error sending event")

ErrSendingEvent is the error returned when an error occurs while sending an event to the Queue it is joined with the underlying error

View Source
var ErrSnapshotFailed = errors.New("snapshot failed")

ErrSnapshotFailed is the error returned when a snapshot fails to be created it is joined with the underlying error

View Source
var ErrStoringEvents = errors.New("error storing events")

ErrStoringEvents is the error returned when an error occurs while storing events it is joined with the underlying error

View Source
var ErrVersionNotFound = errors.New("version not found")

ErrVersionNotFound is the error returned when a version is not found

Functions

func SafeApply added in v0.0.10

func SafeApply(event gosignal.Event, agg Aggregate, applyFn func(gosignal.Event) error) (err error)

SafeApply applies an event to an aggregate, ensuring that the event is applied in to the correct version of the aggregate.

- It returns an error if the event version is not equal to the aggregate version. - It will increment the aggregate version if the event is applied successfully. - It returns an error if the apply function returns an error and can recover from a panic

func WithEventStore

func WithEventStore(es EventStore) func(*Repository)

func WithQueue

func WithQueue(q gosignal.Queue) func(*Repository)

func WithSnapshotStrategy

func WithSnapshotStrategy(ss SnapshotStrategy) func(*Repository)

Types

type Aggregate

type Aggregate interface {
	SetID(string)
	SetVersion(uint64)
	GetID() string
	GetVersion() uint64
	Apply(gosignal.Event) error
	ImportState([]byte) error
	ExportState() ([]byte, error)
}

Aggregate is the interface for all aggregates For ID and Version, the DefaultAggregate struct can be embedded to provide default implementations

type DefaultAggregate added in v0.0.10

type DefaultAggregate struct {
	DefaultAggregateVersionManager
	ID string // ID of the aggregate
}

DefaultAggregate is a struct that can be embedded in an aggregate to provide default implementations

func (*DefaultAggregate) GetID added in v0.0.10

func (a *DefaultAggregate) GetID() string

GetID returns the id of the aggregate

func (*DefaultAggregate) SetID added in v0.0.10

func (a *DefaultAggregate) SetID(id string)

SetID sets the id of the aggregate

type DefaultAggregateUint64 added in v0.0.12

type DefaultAggregateUint64 struct {
	DefaultAggregateVersionManager
	ID uint64 // ID of the aggregate
}

DefaultAggregateUint64 is a struct that can be embedded in an aggregate to provide default implementations for the ID field as a uint64

func (*DefaultAggregateUint64) GetID added in v0.0.12

func (a *DefaultAggregateUint64) GetID() string

GetID returns the id of the aggregate

func (*DefaultAggregateUint64) GetIDUint64 added in v0.0.12

func (a *DefaultAggregateUint64) GetIDUint64() uint64

func (*DefaultAggregateUint64) SetID added in v0.0.12

func (a *DefaultAggregateUint64) SetID(id string)

SetID sets the id of the aggregate

func (*DefaultAggregateUint64) SetIDUint64 added in v0.0.12

func (a *DefaultAggregateUint64) SetIDUint64(id uint64)

type DefaultAggregateVersionManager added in v0.0.12

type DefaultAggregateVersionManager struct {
	Version uint64
}

func (*DefaultAggregateVersionManager) GetVersion added in v0.0.12

func (a *DefaultAggregateVersionManager) GetVersion() uint64

func (*DefaultAggregateVersionManager) SetVersion added in v0.0.12

func (a *DefaultAggregateVersionManager) SetVersion(version uint64)

type EventStore

type EventStore interface {
	// Store stores a list of events for a given aggregate id
	Store(ctx context.Context, events []gosignal.Event) error
	// Load loads all events for a given aggregate id
	Load(ctx context.Context, aggID string, options LoadEventsOptions) ([]gosignal.Event, error)
	// Replace replaces an event with a new version, this mostly exists for legal compliance
	// purposes, your event store should be append-only
	Replace(ctx context.Context, id string, version uint64, event gosignal.Event) error
}

EventStore is the interface that wraps the basic event store operations it reperents some form of storage for your event sourcing solution.

type LoadEventsOptions

type LoadEventsOptions struct {
	MinVersion *uint64    // the minimum version of the aggregate to load
	MaxVersion *uint64    // the maximum version of the aggregate to load
	EventTypes []string   // the types of events to load, if empty all events are loaded
	FromTime   *time.Time // the time from which to load events
	ToTime     *time.Time // the time to which to load events
}

LoadEventsOptions represents the options that can be passed to the Load method

type NewRepoOptions

type NewRepoOptions func(*Repository)

type RepoLoadOptions

type RepoLoadOptions struct {
	// contains filtered or unexported fields
}

RepoLoadOptions holds the configuration options for loading from the repository.

type RepoLoaderConfigurator

type RepoLoaderConfigurator struct {
	// contains filtered or unexported fields
}

RepoLoaderConfigurator is responsible for setting up load options for the repository.

func NewRepoLoaderConfigurator

func NewRepoLoaderConfigurator() *RepoLoaderConfigurator

NewRepoLoaderConfigurator creates a new instance of RepoLoaderConfigurator with default settings.

func (*RepoLoaderConfigurator) Build

Build finalizes the configuration and returns the configured RepoLoadOptions.

func (*RepoLoaderConfigurator) EventTypes

func (c *RepoLoaderConfigurator) EventTypes(types ...string) *RepoLoaderConfigurator

EventTypes sets the event types to load and returns the configurator. NOTE: this will lead to inconsistent state if you're loading against an aggregate. Only use this if you're directly querying events.

func (*RepoLoaderConfigurator) FromTime

FromTime sets the starting time from which to load events and returns the configurator. NOTE: this will lead to inconsistent state if you're loading against an aggregate. Only use this if you're directly querying events.

func (*RepoLoaderConfigurator) MaxVersion

func (c *RepoLoaderConfigurator) MaxVersion(version uint64) *RepoLoaderConfigurator

MaxVersion sets the maximum version of the aggregate to load and returns the configurator. can be used to load a specific version of the aggregate

func (*RepoLoaderConfigurator) MinVersion

func (c *RepoLoaderConfigurator) MinVersion(version uint64) *RepoLoaderConfigurator

MinVersion sets the minimum version of the aggregate to load and returns the configurator. NOTE: this will lead to inconsistent state if you're loading against an aggregate. Only use this if you're directly querying events.

func (*RepoLoaderConfigurator) SkipSnapshot

func (c *RepoLoaderConfigurator) SkipSnapshot(skip bool) *RepoLoaderConfigurator

SkipSnapshot indicates to skip loading the snapshot during the operation and returns the configurator.

func (*RepoLoaderConfigurator) ToTime

ToTime sets the ending time to which to load events and returns the configurator.

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

Repository is a struct that interacts with the event store, snapshot store, and aggregate

func NewRepository

func NewRepository(options ...NewRepoOptions) *Repository

NewRepository creates a new repository

func (*Repository) ApplyEvents added in v0.0.10

func (r *Repository) ApplyEvents(agg Aggregate, events []gosignal.Event) error

ApplyEvents iteratively applies events to an aggregate

func (*Repository) Load

func (r *Repository) Load(ctx context.Context, agg Aggregate, opts *RepoLoadOptions) error

Load loads an aggregate from the event store, reconstructing it from its events and snapshot

func (*Repository) LoadEvents

func (r *Repository) LoadEvents(ctx context.Context, aggregateID string, opts *RepoLoadOptions) ([]gosignal.Event, error)

LoadEvents loads events from the event store

func (*Repository) ReplaceVersion

func (r *Repository) ReplaceVersion(ctx context.Context, aggID string, agg Aggregate, ver uint64, e gosignal.Event) error

ReplaceVersion replaces the version of an aggregate in the event store Note: this is a dangerous operation and should be used with caution, it exists largely for legal compliance reasons, otherwise your event store should be append-only NOTE: Pass an empty aggregate in, as this function will replay all events on the aggregate and apply the new event, this is to ensure that the aggregate is in the correct state

func (*Repository) Store

func (r *Repository) Store(ctx context.Context, events []gosignal.Event) error

Store stores events in the event store

type Snapshot

type Snapshot struct {
	Data      []byte
	Timestamp time.Time
	Version   uint64
	ID        string
}

Snapshot is a snapshot of the state of an aggregate

type SnapshotStore

type SnapshotStore interface {
	Load(ctx context.Context, id string) (*Snapshot, error)
	Store(ctx context.Context, aggregateID string, snapshot Snapshot) error
	Delete(ctx context.Context, aggregateID string) error
}

SnapshotStore is the interface that wraps the basic snapshot store operations

type SnapshotStrategy

type SnapshotStrategy interface {
	ShouldSnapshot(snapshot *Snapshot, events []gosignal.Event) bool
	GetStore() SnapshotStore
}

SnapshotStrategy is an adaptable strategy for when to take a snapshot

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL