mysql

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2020 License: MIT Imports: 14 Imported by: 1

README

MySQl Event Store

This event store only works with aggregates. We extract aggregate event information from the event and use it to create a unique constratint.

In the future we may introducer "stragagies" that would allow for a custom stream tables.

Projections

package main

func main() {
    pm := NewProjectionManager(es)
    sp. err := pm.Create(ctx, "users_dto", []projection.ProjectorOpt{})
    sp.From("users").WhenAny(func (context.Context, messages.Message) error {
        //
    })
    if err := sp.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

Documentation

Index

Constants

View Source
const (
	// DefaultBatchSize ...
	DefaultBatchSize uint64 = 1000
)

Variables

This section is empty.

Functions

func NewProjectionManager

func NewProjectionManager(es *EventStore) projection.Manager

NewProjectionManager will get a projection manager that uses the MySQL backend to store projection states.

Types

type ErrorStreamIterator

type ErrorStreamIterator struct {
	// contains filtered or unexported fields
}

ErrorStreamIterator is returned when an error occured getting stream data, maybe it didn't exist.

func (*ErrorStreamIterator) Close

func (it *ErrorStreamIterator) Close()

Close is empty.

func (*ErrorStreamIterator) Current

func (it *ErrorStreamIterator) Current() *messages.Event

Current always returns nil.

func (*ErrorStreamIterator) Error

func (it *ErrorStreamIterator) Error() string

Error will return the inner error's Error method result.

func (*ErrorStreamIterator) Next

func (it *ErrorStreamIterator) Next(ctx context.Context) error

Next return the error provided.

func (*ErrorStreamIterator) Rewind

func (it *ErrorStreamIterator) Rewind()

Rewind is empty.

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore will use a MySQL database to manage streams.

func New

func New(ctx context.Context, dsn string, batchSize uint64, payloadBuilder messages.PayloadBuilder) (*EventStore, error)

New returns a new MySQL event store, it is best to send a context with a deadline so we do not hang.

func (*EventStore) AppendTo

func (s *EventStore) AppendTo(ctx context.Context, streamName string, events []*messages.Event) error

AppendTo will append events to the stream.

func (*EventStore) Create

func (s *EventStore) Create(ctx context.Context, stream *eventstore.Stream) error

Create will create the stream with the name and metadata provided.

AFAIK MySQL doesn't have transactions that support schema changes along with inserting rows etc so this could leave the database in a dodgy state.

func (*EventStore) DB

func (s *EventStore) DB() *sql.DB

DB will return the database being used.

func (*EventStore) Delete

func (s *EventStore) Delete(ctx context.Context, streamName string) error

Delete will remove the stream.

func (*EventStore) FetchStreamMetadata

func (s *EventStore) FetchStreamMetadata(ctx context.Context, streamName string) (eventstore.StreamMetadata, error)

FetchStreamMetadata gets the metadata about a stream.

func (*EventStore) FetchStreamNames

func (s *EventStore) FetchStreamNames(ctx context.Context, filter string, matcher eventstore.MetadataMatcher, limit, offset uint64) ([]string, error)

FetchStreamNames gets stream names that match the filter.

func (*EventStore) FetchStreamNamesRegex

func (s *EventStore) FetchStreamNamesRegex(ctx context.Context, filter string, matcher eventstore.MetadataMatcher, limit, offset uint64) ([]string, error)

FetchStreamNamesRegex gets stream names that match the regex filter.

func (*EventStore) GetProjectionManager

func (s *EventStore) GetProjectionManager() projection.Manager

GetProjectionManager ...

func (*EventStore) Load

func (s *EventStore) Load(ctx context.Context, streamName string, from, count uint64, matcher eventstore.MetadataMatcher) eventstore.StreamIterator

Load events from the given stream name.

func (*EventStore) LoadReverse

func (s *EventStore) LoadReverse(ctx context.Context, streamName string, from, count uint64, matcher eventstore.MetadataMatcher) eventstore.StreamIterator

LoadReverse Loads events from the given stream name in reverse.

func (*EventStore) Ping

func (s *EventStore) Ping(ctx context.Context) error

Ping tests connection to the database is still ok.

func (*EventStore) UpdateStreamMetadata

func (s *EventStore) UpdateStreamMetadata(ctx context.Context, streamName string, newMetadata eventstore.StreamMetadata) error

UpdateStreamMetadata sets the metadata for the given stream name.

type ProjectionManager

type ProjectionManager struct {
	// contains filtered or unexported fields
}

ProjectionManager ...

func (*ProjectionManager) Create

Create ...

func (*ProjectionManager) Delete

func (m *ProjectionManager) Delete(ctx context.Context, projectionName string) error

Delete ...

func (*ProjectionManager) FetchPojectionStatus

func (m *ProjectionManager) FetchPojectionStatus(ctx context.Context, projectionName string) (projection.Status, error)

FetchPojectionStatus ...

func (*ProjectionManager) FetchPojectionStreamPositions

func (m *ProjectionManager) FetchPojectionStreamPositions(ctx context.Context, projectionName string) (projection.StreamPositions, error)

FetchPojectionStreamPositions ...

func (*ProjectionManager) FetchProjectionNames

func (m *ProjectionManager) FetchProjectionNames(ctx context.Context, filter string, start, limit uint64) ([]string, error)

FetchProjectionNames ...

func (*ProjectionManager) Reset

func (m *ProjectionManager) Reset(ctx context.Context, projectionName string) error

Reset ...

func (*ProjectionManager) Stop

func (m *ProjectionManager) Stop(ctx context.Context, projectionName string) error

Stop ...

type StreamIterator

type StreamIterator struct {
	// contains filtered or unexported fields
}

StreamIterator iterates over events from a MySQL database.

func (*StreamIterator) Close

func (it *StreamIterator) Close()

Close will clean up resources, do not attempt to use stream after closing.

func (*StreamIterator) Current

func (it *StreamIterator) Current() *messages.Event

Current will return the item we currently have.

func (*StreamIterator) Next

func (it *StreamIterator) Next(ctx context.Context) error

Next will get the next result, and if there is an error return it. Once next has been called without an error returned you can grab the result from Current()

func (*StreamIterator) Rewind

func (it *StreamIterator) Rewind()

Rewind will set the position of the stream back to the default position and allow you to iterate of the stream again.

type StreamProjection

type StreamProjection struct {
	// contains filtered or unexported fields
}

StreamProjection ...

func (*StreamProjection) FromStream

func (p *StreamProjection) FromStream(streamName string) projection.Projector

FromStream will limit the Projector to events from 1 stream.

func (*StreamProjection) FromStreams

func (p *StreamProjection) FromStreams(streamNames []string) projection.Projector

FromStreams will limit the Projector to events from many streams.

func (*StreamProjection) Run

func (p *StreamProjection) Run(ctx context.Context) error

Run will start the processing of the events.

func (*StreamProjection) Stop

func (p *StreamProjection) Stop(ctx context.Context) error

Stop will stop the processing of the events.

func (*StreamProjection) When

When the event with the event name is given the callback will be called.

func (*StreamProjection) WhenAny

WhenAny event is given the callback will be called.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL