projection

package
v0.39.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 23 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PartitionedCompetingWorkers added in v0.29.0

func PartitionedCompetingWorkers[K eventsourcing.ID](
	ctx context.Context,
	logger *slog.Logger,
	lockerFactory LockerFactory,
	subscriberFactory SubscriberFactory[K],
	topic string, partitions uint32,
	esRepo EventsRepository[K],
	projection Projection[K],
	splits int,
	resumeStore store.KVStore,
) ([]*worker.RunWorker, error)

PartitionedCompetingWorkers creates workers that will run depending if a lock was acquired or not.

If a locker is provided it is possible to balance workers between the several server instances using a worker.Balancer

func PartitionedWorkers added in v0.29.0

func PartitionedWorkers[K eventsourcing.ID](
	ctx context.Context,
	logger *slog.Logger,
	lockerFactory LockerFactory,
	subscriberFactory SubscriberFactory[K],
	topic string, partitions uint32,
	esRepo EventsRepository[K],
	projection Projection[K],
	splits int,
	splitIds []int,
	resumeStore store.KVStore,
) ([]*worker.RunWorker, error)

PartitionedWorkers creates workers that will always run because a balancer locker is not provided. This assumes that the balancing will be done by the message broker.

func Project added in v0.32.0

func Project[K eventsourcing.ID](
	logger *slog.Logger,
	lockerFactory LockerFactory,
	esRepo EventsRepository[K],
	subscriber Consumer[K],
	projection Projection[K],
	resumeStore store.KVStore,
	catchupSplits int,
) *worker.RunWorker

NewProjector creates a subscriber to an event stream and process all events.

It will check if it needs to do a catch up. If so, it will try acquire a lock and run a projection catchup. If it is unable to acquire the lock because it is held by another process, it will wait for its release. In the end it will fire up the subscribers. All this will happen in a separate go routine allowing the service to completely start up.

After a successfully projection creation, subsequent start up will no longer execute the catch up function. This can be used to migrate projections, where a completely new projection will be populated.

The catch up function should replay all events from all event stores needed for this

func StartGrpcServer added in v0.31.0

func StartGrpcServer[K eventsourcing.ID](ctx context.Context, address string, repo EventsRepository[K]) error

Types

type Cancel added in v0.31.0

type Cancel func()

type CatchUpCallback added in v0.29.0

type CatchUpCallback func(context.Context, eventid.EventID) (eventid.EventID, error)

type CatchUpOptions added in v0.34.0

type CatchUpOptions struct {
	StartOffset          time.Duration
	CatchUpWindow        time.Duration
	UpdateResumeInterval time.Duration

	AggregateKinds []eventsourcing.Kind
	// Metadata filters on top of metadata. Every key of the map is ANDed with every OR of the values
	// eg: [{"geo": "EU"}, {"geo": "USA"}, {"membership": "prime"}] equals to:  geo IN ("EU", "USA") AND membership = "prime"
	Metadata store.MetadataFilter
}

type Checkpoints added in v0.39.0

type Checkpoints[K eventsourcing.ID] struct {
	// contains filtered or unexported fields
}

func NewCheckpoints added in v0.39.0

func NewCheckpoints[K eventsourcing.ID](store store.KVStore, txRunner store.Tx) *Checkpoints[K]

func (*Checkpoints[K]) Handle added in v0.39.1

func (p *Checkpoints[K]) Handle(ctx context.Context, msg Message[K], callback func(*sink.Message[K]) error) error

func (*Checkpoints[K]) Reject added in v0.39.0

func (p *Checkpoints[K]) Reject(ctx context.Context, msg Message[K]) (bool, error)

Reject verifies if projection should accept the event within the message.

When the message if of type MessageKindSwitch, it signals that the catchup has been done and that from now on all events will come from the event bus

func (*Checkpoints[K]) Save added in v0.39.0

func (p *Checkpoints[K]) Save(ctx context.Context, msg Message[K]) error

Save saves the checkpoint for the projection.

The context must contain a database transaction.

type Consumer added in v0.15.0

type Consumer[K eventsourcing.ID] interface {
	Topic() string
	// returns the subscriber Positions. The first Position should be 1
	StartConsumer(ctx context.Context, startTime *time.Time, projectionName string, handle ConsumerHandler[K], options ...ConsumerOption[K]) error
}

type ConsumerHandler added in v0.34.0

type ConsumerHandler[K eventsourcing.ID] func(ctx context.Context, e *sink.Message[K], partition uint32, seq uint64) error

type ConsumerOption

type ConsumerOption[K eventsourcing.ID] func(*ConsumerOptions[K])

func WithAckWait added in v0.25.0

func WithAckWait[K eventsourcing.ID](ackWait time.Duration) ConsumerOption[K]

func WithFilter

func WithFilter[K eventsourcing.ID](filter func(e *sink.Message[K]) bool) ConsumerOption[K]

type ConsumerOptions

type ConsumerOptions[K eventsourcing.ID] struct {
	Filter  func(e *sink.Message[K]) bool
	AckWait time.Duration
}

type ConsumerTopic added in v0.34.0

type ConsumerTopic struct {
	Topic      string
	Partitions []uint32
}

type Event added in v0.30.0

type Event[K eventsourcing.ID] struct {
	ID               eventid.EventID
	AggregateID      K
	AggregateVersion uint32
	AggregateKind    eventsourcing.Kind
	Kind             eventsourcing.Kind
	Body             encoding.Base64
	Metadata         eventsourcing.Metadata
	CreatedAt        time.Time
}

func FromEvent added in v0.30.0

func FromEvent[K eventsourcing.ID](e *eventsourcing.Event[K]) *Event[K]

func FromMessage added in v0.30.0

func FromMessage[K eventsourcing.ID](m *sink.Message[K]) *Event[K]

type EventsRepository added in v0.32.0

type EventsRepository[K eventsourcing.ID] interface {
	GetEvents(ctx context.Context, afterEventID eventid.EventID, untilEventID eventid.EventID, batchSize int, filter store.Filter) ([]*eventsourcing.Event[K], error)
}

type GetEventsReply added in v0.39.0

type GetEventsReply[K eventsourcing.ID] struct {
	Events []*eventsourcing.Event[K]
	Error  error
}

type GrpcRepository added in v0.31.0

type GrpcRepository[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct {
	// contains filtered or unexported fields
}

func NewGrpcRepository added in v0.31.0

func NewGrpcRepository[K eventsourcing.ID, PK eventsourcing.IDPt[K]](address string) GrpcRepository[K, PK]

func (GrpcRepository[K, PK]) GetEvents added in v0.31.0

func (c GrpcRepository[K, PK]) GetEvents(ctx context.Context, after, until eventid.EventID, limit int, filter store.Filter) ([]*eventsourcing.Event[K], error)

type GrpcServer added in v0.31.0

type GrpcServer[K eventsourcing.ID] struct {
	// contains filtered or unexported fields
}

func (*GrpcServer[K]) GetEvents added in v0.31.0

func (s *GrpcServer[K]) GetEvents(ctx context.Context, r *pb.GetEventsRequest) (*pb.GetEventsReply, error)

type HandleEventsReply added in v0.39.0

type HandleEventsReply[K eventsourcing.ID] struct {
	AfterEventID eventid.EventID
	Count        int64
	Error        error
}

type LockerFactory added in v0.15.0

type LockerFactory func(lockName string) dist.Locker

type Message added in v0.39.0

type Message[K eventsourcing.ID] struct {
	Meta    Meta
	Message *sink.Message[K]
}

type MessageHandlerFunc added in v0.30.0

type MessageHandlerFunc[K eventsourcing.ID] func(ctx context.Context, e *sink.Message[K]) error

type MessageKind added in v0.39.0

type MessageKind string
var (
	MessageKindCatchup MessageKind = "catchup"
	MessageKindSwitch  MessageKind = "switch"
	MessageKindLive    MessageKind = "live"
)

type Meta added in v0.31.0

type Meta struct {
	Name      string
	Kind      MessageKind
	Partition uint32
	Sequence  uint64
}

type Option added in v0.31.0

type Option[K eventsourcing.ID] func(*Player[K])

func WithBatchSize added in v0.31.0

func WithBatchSize[K eventsourcing.ID](batchSize int) Option[K]

func WithCustomFilter added in v0.31.0

func WithCustomFilter[K eventsourcing.ID](fn func(events *eventsourcing.Event[K]) bool) Option[K]

type Player added in v0.31.0

type Player[K eventsourcing.ID] struct {
	// contains filtered or unexported fields
}

func NewPlayer added in v0.32.0

func NewPlayer[K eventsourcing.ID](repository EventsRepository[K], options ...Option[K]) Player[K]

NewPlayer instantiates a new Player.

func (Player[K]) Replay added in v0.31.0

func (p Player[K]) Replay(ctx context.Context, handler MessageHandlerFunc[K], afterEventID, untilEventID eventid.EventID, filters ...store.FilterOption) (eventid.EventID, int64, error)

type Projection added in v0.32.0

type Projection[K eventsourcing.ID] interface {
	Name() string
	CatchUpOptions() CatchUpOptions
	Handle(ctx context.Context, e Message[K]) error
}

type ResumeKey added in v0.25.0

type ResumeKey struct {
	// contains filtered or unexported fields
}

ResumeKey is used to retrieve the last event id to replay messages directly from the event store.

func NewResumeKey added in v0.34.0

func NewResumeKey(projectionName, topic string, partition uint32) (_ ResumeKey, er error)

func (ResumeKey) Partition added in v0.34.0

func (r ResumeKey) Partition() uint32

func (ResumeKey) Projection added in v0.29.0

func (r ResumeKey) Projection() string

func (ResumeKey) String added in v0.25.0

func (r ResumeKey) String() string

func (ResumeKey) Topic added in v0.25.0

func (r ResumeKey) Topic() string

type Start added in v0.31.0

type Start int
const (
	END Start = iota
	BEGINNING
	SEQUENCE
)

type StartOption added in v0.31.0

type StartOption struct {
	// contains filtered or unexported fields
}

func StartAt added in v0.31.0

func StartAt(sequence uint64) StartOption

func StartBeginning added in v0.31.0

func StartBeginning() StartOption

func StartEnd added in v0.31.0

func StartEnd() StartOption

func (StartOption) AfterSeq added in v0.31.0

func (so StartOption) AfterSeq() uint64

func (StartOption) StartFrom added in v0.31.0

func (so StartOption) StartFrom() Start

type SubscriberFactory added in v0.29.0

type SubscriberFactory[K eventsourcing.ID] func(context.Context, ResumeKey) Consumer[K]

type Token added in v0.29.0

type Token struct {
	Done    bool            `json:"done"`
	EventID eventid.EventID `json:"eventID,omitempty"`
}

func ParseToken added in v0.29.0

func ParseToken(s string) (_ Token, e error)

func (Token) IsEmpty added in v0.29.0

func (t Token) IsEmpty() bool

func (Token) String added in v0.29.0

func (t Token) String() string

type WaitLockerFactory added in v0.29.0

type WaitLockerFactory func(lockName string) dist.WaitLocker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL