saga

package
v0.1.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2022 License: MIT Imports: 15 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AddHistoryEvent added in v0.1.17

type AddHistoryEvent struct {
	TraceUID string
	Origin   string
}

type BaseSaga

type BaseSaga struct {
	message.ObjectMeta
	// contains filtered or unexported fields
}

func (*BaseSaga) AddEventHandler

func (b *BaseSaga) AddEventHandler(ev message.Object, handler Executor) *BaseSaga

func (BaseSaga) EventHandlers

func (b BaseSaga) EventHandlers() map[scheme.GroupKind]Executor

func (*BaseSaga) SetSchema

func (b *BaseSaga) SetSchema(scheme scheme.KnownTypesRegistry)

type Delivery

type Delivery struct {
	Payload message.Object
	Options []endpoint.DeliveryOption
}

type Executor

type Executor func(execCtx SagaContext) error

type FilterOption

type FilterOption func(opts *filterOptions)

func WithOffsetAndLimit added in v0.1.18

func WithOffsetAndLimit(offset int, limit int) FilterOption

func WithSagaId

func WithSagaId(sagaId string) FilterOption

func WithSagaName

func WithSagaName(sagaName string) FilterOption

func WithStatus

func WithStatus(status string) FilterOption

type HistoryEvent

type HistoryEvent struct {
	UID          string         `json:"uid"`
	CreatedAt    time.Time      `json:"created_at"`
	Payload      message.Object `json:"payload"`
	OriginSource string         `json:"origin"`
	SagaStatus   string         `json:"saga_status"` //saga status at the moment
	TraceUID     string         `json:"trace_uid"`   //uid of received message, could be empty
}

type Instance

type Instance interface {
	UID() string
	Saga() Saga
	Status() Status

	Start(sagaCtx SagaContext) error
	Compensate(sagaCtx SagaContext) error
	Recover(sagaCtx SagaContext) error
	Complete()
	Fail(ev message.Object)

	HistoryEvents() []HistoryEvent
	AddHistoryEvent(ev message.Object, ahv *AddHistoryEvent)

	StartedAt() *time.Time
	UpdatedAt() *time.Time
	ParentID() string
}

func NewSagaInstance

func NewSagaInstance(id, parentId string, saga Saga) Instance

type InstancesBatch added in v0.1.18

type InstancesBatch struct {
	Total int
	Items []Instance
}

type SQLDriver

type SQLDriver string
const (
	MYSQLDriver SQLDriver = "mysql"
	PGDriver    SQLDriver = "pg"
)

type Saga

type Saga interface {
	// include Object interface as any message type in MessageBus a saga should have metadata
	message.Object
	// Init function assigns a contract type to a handler
	Init()
	// Start will be triggered when StartSagaCommand received
	Start(sagaCtx SagaContext) error
	// Compensate will be triggered when CompensateSagaCommand received
	Compensate(sagaCtx SagaContext) error
	// Recover will be triggered when RecoverSagaCommand received
	Recover(sagaCtx SagaContext) error
	// EventHandlers returns a list of assigned executors per type in Init()
	EventHandlers() map[scheme.GroupKind]Executor
	// SetSchema allows to set schema instance during the saga runtime
	SetSchema(scheme scheme.KnownTypesRegistry)
}

type SagaContext

type SagaContext interface {
	//execution.MessageExecutionCtx
	Message() *message.ReceivedMessage
	Context() context.Context
	// Valid Deprecated
	Valid() bool
	Dispatch(payload message.Object, options ...endpoint.DeliveryOption)
	Deliveries() []*Delivery
	Return(options ...endpoint.DeliveryOption) error
	Logger() log.Logger
	SagaInstance() Instance
}

SagaContext is sealed interface due to deliver method, that takes all dispatched deliveries and start sending out them Still its' not decided if this interface users should be able to implement

func NewSagaCtx

func NewSagaCtx(execCtx execution.MessageExecutionCtx, sagaInstance Instance) SagaContext

type SagaUIDService

type SagaUIDService interface {
	ExtractSagaUID(headers message.Headers) (string, error)
	AddSagaId(headers message.Headers, sagaUID string)
}

SagaUIDService manipulates with sagaId in headers

func NewSagaUIDService

func NewSagaUIDService() SagaUIDService

NewSagaUIDService constructs default implementation of SagaUIDService

type Status

type Status interface {
	InProgress() bool
	Failed() bool
	FailedOnEvent() message.Object
	Recovering() bool
	Compensating() bool
	Completed() bool
	String() string
}

type Store

type Store interface {
	Create(ctx context.Context, saga Instance) error
	GetById(ctx context.Context, sagaId string) (Instance, error)
	GetByFilter(ctx context.Context, filters ...FilterOption) (*InstancesBatch, error)
	Update(ctx context.Context, saga Instance) error
	Delete(ctx context.Context, sagaId string) error
}

func NewSQLSagaStore

func NewSQLSagaStore(db *sagaSql.DB, driver SQLDriver, msgMarshaller message.Marshaller) (Store, error)

NewSQLSagaStore creates sql saga store, it supports mysql and postgres drivers. driver param is required because of https://github.com/golang/go/issues/3602. Better this than +1 dependency or copy pasting code

Directories

Path Synopsis
api

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL