mongodb

package
v0.39.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 26 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func OutboxInsertHandler added in v0.34.0

func OutboxInsertHandler[K eventsourcing.ID](database, collName string) store.InTxHandler[K]

func TxRunner added in v0.39.0

func TxRunner(client *mongo.Client) store.Tx

Types

type ChangeEvent

type ChangeEvent struct {
	FullDocument Event `bson:"fullDocument,omitempty"`
}

type EsRepository

type EsRepository[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct {
	Repository
	// contains filtered or unexported fields
}

func NewStore

func NewStore[K eventsourcing.ID, PK eventsourcing.IDPt[K]](ctx context.Context, client *mongo.Client, database string, opts ...Option[K, PK]) (*EsRepository[K, PK], error)

NewStore creates a new instance of MongoEsRepository

func NewStoreWithURI added in v0.33.0

func NewStoreWithURI[K eventsourcing.ID, PK eventsourcing.IDPt[K]](ctx context.Context, connString, database string, opts ...Option[K, PK]) (*EsRepository[K, PK], error)

NewStoreWithURI creates a new instance of MongoEsRepository

func (*EsRepository[K, PK]) Client added in v0.33.0

func (r *EsRepository[K, PK]) Client() *mongo.Client

func (*EsRepository[K, PK]) Close

func (r *EsRepository[K, PK]) Close(ctx context.Context)

func (*EsRepository[K, PK]) Forget

func (r *EsRepository[K, PK]) Forget(ctx context.Context, request eventsourcing.ForgetRequest[K], forget func(kind eventsourcing.Kind, body []byte) ([]byte, error)) error

func (*EsRepository[K, PK]) GetAggregateEvents

func (r *EsRepository[K, PK]) GetAggregateEvents(ctx context.Context, aggregateID K, snapVersion int) ([]*eventsourcing.Event[K], error)

func (*EsRepository[K, PK]) GetEvents

func (r *EsRepository[K, PK]) GetEvents(ctx context.Context, after, until eventid.EventID, batchSize int, filter store.Filter) ([]*eventsourcing.Event[K], error)

func (*EsRepository[K, PK]) GetEventsByRawIDs added in v0.35.0

func (r *EsRepository[K, PK]) GetEventsByRawIDs(ctx context.Context, ids []string) ([]*eventsourcing.Event[K], error)

func (*EsRepository[K, PK]) GetSnapshot

func (r *EsRepository[K, PK]) GetSnapshot(ctx context.Context, aggregateID K) (eventsourcing.Snapshot[K], error)

func (*EsRepository[K, PK]) MigrateInPlaceCopyReplace added in v0.21.0

func (r *EsRepository[K, PK]) MigrateInPlaceCopyReplace(
	ctx context.Context,
	revision int,
	snapshotThreshold uint32,
	rehydrateFunc func(eventsourcing.Aggregater[K], *eventsourcing.Event[K]) error,
	codec eventsourcing.Codec[K],
	handler eventsourcing.MigrationHandler[K],
	targetAggregateKind eventsourcing.Kind,
	aggregateKind eventsourcing.Kind,
	eventTypeCriteria ...eventsourcing.Kind,
) error

func (*EsRepository[K, PK]) SaveEvent

func (r *EsRepository[K, PK]) SaveEvent(ctx context.Context, eRec *eventsourcing.EventRecord[K]) (eventid.EventID, uint32, error)

func (*EsRepository[K, PK]) SaveSnapshot

func (r *EsRepository[K, PK]) SaveSnapshot(ctx context.Context, snapshot *eventsourcing.Snapshot[K]) error

type Event

type Event struct {
	ID               string             `bson:"_id,omitempty"`
	AggregateID      string             `bson:"aggregate_id,omitempty"`
	AggregateIDHash  int32              `bson:"aggregate_id_hash,omitempty"`
	AggregateVersion uint32             `bson:"aggregate_version,omitempty"`
	AggregateKind    eventsourcing.Kind `bson:"aggregate_kind,omitempty"`
	Kind             eventsourcing.Kind `bson:"kind,omitempty"`
	Body             []byte             `bson:"body,omitempty"`
	Metadata         bson.M             `bson:"metadata,omitempty"`
	CreatedAt        time.Time          `bson:"created_at,omitempty"`
	Migration        int                `bson:"migration"`
	Migrated         bool               `bson:"migrated"`
}

Event is the event data stored in the database

type EventsRepository added in v0.34.0

type EventsRepository[K eventsourcing.ID] interface {
	GetEventsByRawIDs(context.Context, []string) ([]*eventsourcing.Event[K], error)
}

type Feed

type Feed[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct {
	// contains filtered or unexported fields
}

func NewFeed

func NewFeed[K eventsourcing.ID, PK eventsourcing.IDPt[K]](logger *slog.Logger, connString, database string, sinker sink.Sinker[K], opts ...FeedOption[K, PK]) (Feed[K, PK], error)

func (*Feed[K, PK]) Run added in v0.25.0

func (f *Feed[K, PK]) Run(ctx context.Context) error

type FeedOption

type FeedOption[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(*Feed[K, PK])

func WithFeedEventsCollection

func WithFeedEventsCollection[K eventsourcing.ID, PK eventsourcing.IDPt[K]](eventsCollection string) FeedOption[K, PK]

func WithFilter added in v0.36.0

func WithFilter[K eventsourcing.ID, PK eventsourcing.IDPt[K]](filter *store.Filter) FeedOption[K, PK]

type InTxHandler added in v0.39.0

type InTxHandler[K eventsourcing.ID] func(*InTxHandlerContext[K]) error

type InTxHandlerContext added in v0.39.0

type InTxHandlerContext[K eventsourcing.ID] struct {
	// contains filtered or unexported fields
}

func NewInTxHandlerContext added in v0.39.0

func NewInTxHandlerContext[K eventsourcing.ID](ctx context.Context, event *eventsourcing.Event[K]) *InTxHandlerContext[K]

func (*InTxHandlerContext[K]) Context added in v0.39.0

func (c *InTxHandlerContext[K]) Context() context.Context

func (*InTxHandlerContext[K]) Event added in v0.39.0

func (c *InTxHandlerContext[K]) Event() *eventsourcing.Event[K]

type KVStore added in v0.34.0

type KVStore struct {
	Repository
	// contains filtered or unexported fields
}

func NewKVStore added in v0.34.0

func NewKVStore(client *mongo.Client, dbName, collection string) KVStore

func NewKVStoreWithURI added in v0.34.0

func NewKVStoreWithURI(connString, database, collection string) (KVStore, error)

func (KVStore) Get added in v0.34.0

func (m KVStore) Get(ctx context.Context, key string) (string, error)

func (KVStore) Put added in v0.34.0

func (m KVStore) Put(ctx context.Context, key string, value string) error

type Option added in v0.28.0

type Option[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(f *EsRepository[K, PK])

func WithEventsCollection

func WithEventsCollection[K eventsourcing.ID, PK eventsourcing.IDPt[K]](eventsCollection string) Option[K, PK]

func WithMetadata added in v0.36.0

func WithMetadata[K eventsourcing.ID, PK eventsourcing.IDPt[K]](metadata eventsourcing.Metadata) Option[K, PK]

WithMetadata defines the metadata to be save on every event. Data keys will be converted to lower case

func WithMetadataHook added in v0.36.0

func WithMetadataHook[K eventsourcing.ID, PK eventsourcing.IDPt[K]](fn store.MetadataHook[K]) Option[K, PK]

WithMetadataHook defines the hook that will return the metadata. This metadata will override any metadata defined at the repository level

func WithPostSchemaCreation added in v0.38.0

func WithPostSchemaCreation[K eventsourcing.ID, PK eventsourcing.IDPt[K]](post func(Schema) []bson.D) Option[K, PK]

func WithSkipSchemaCreation added in v0.38.0

func WithSkipSchemaCreation[K eventsourcing.ID, PK eventsourcing.IDPt[K]](skip bool) Option[K, PK]

func WithSnapshotsCollection

func WithSnapshotsCollection[K eventsourcing.ID, PK eventsourcing.IDPt[K]](snapshotsCollection string) Option[K, PK]

func WithTxHandler added in v0.34.0

func WithTxHandler[K eventsourcing.ID, PK eventsourcing.IDPt[K]](txHandler store.InTxHandler[K]) Option[K, PK]

type Outbox added in v0.34.0

type Outbox struct {
	ID              string             `bson:"_id,omitempty"`
	AggregateID     string             `bson:"aggregate_id,omitempty"`
	AggregateIDHash uint32             `bson:"aggregate_id_hash,omitempty"`
	AggregateKind   eventsourcing.Kind `bson:"aggregate_kind,omitempty"`
	Kind            eventsourcing.Kind `bson:"kind,omitempty"`
	Metadata        bson.M             `bson:"metadata,omitempty"`
}

type OutboxRepository added in v0.34.0

type OutboxRepository[K eventsourcing.ID] struct {
	Repository
	// contains filtered or unexported fields
}

func NewOutboxStore added in v0.34.0

func NewOutboxStore[K eventsourcing.ID](client *mongo.Client, database, collectionName string, eventsRepo EventsRepository[K]) *OutboxRepository[K]

func (*OutboxRepository[K]) AfterSink added in v0.34.0

func (r *OutboxRepository[K]) AfterSink(ctx context.Context, evtID eventid.EventID) error

func (*OutboxRepository[K]) PendingEvents added in v0.34.0

func (r *OutboxRepository[K]) PendingEvents(ctx context.Context, batchSize int, filter store.Filter) ([]*eventsourcing.Event[K], error)

type Repository added in v0.33.0

type Repository struct {
	// contains filtered or unexported fields
}

func (Repository) TxRunner added in v0.39.0

func (r Repository) TxRunner() store.Tx

func (Repository) WithTx added in v0.33.0

func (r Repository) WithTx(ctx context.Context, callback func(context.Context) error) (err error)

type Schema added in v0.38.0

type Schema struct {
	CollectionNames []string
}

type Snapshot

type Snapshot struct {
	ID               string             `bson:"_id,omitempty"`
	AggregateID      string             `bson:"aggregate_id,omitempty"`
	AggregateVersion uint32             `bson:"aggregate_version,omitempty"`
	AggregateKind    eventsourcing.Kind `bson:"aggregate_kind,omitempty"`
	Body             []byte             `bson:"body,omitempty"`
	CreatedAt        time.Time          `bson:"created_at,omitempty"`
	Metadata         bson.M
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL