messagestore

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2022 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package messagestore provides MySQL-specific implementations of the interfaces in Ax's top-level "messagestore" package.

Index

Constants

View Source
const (
	// DefaultFetchLimit is the number of messages to fetch in each select query on
	// a message stream.
	DefaultFetchLimit = 100

	// DefaultPollInterval is the default time to wait between polls in
	// MessageStream.Next().
	DefaultPollInterval = 500 * time.Millisecond
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Fetcher

type Fetcher interface {
	// FetchRows fetches the n rows beginning at the given offset.
	FetchRows(ctx context.Context, offset, n uint64) (*sql.Rows, error)
}

Fetcher is an interface for fetching rows from the message store

type GlobalFetcher

type GlobalFetcher struct {
	DB *sql.DB
}

GlobalFetcher is a fetcher that fetches rows for the entire store

func (*GlobalFetcher) FetchRows

func (f *GlobalFetcher) FetchRows(ctx context.Context, offset, n uint64) (*sql.Rows, error)

FetchRows fetches the n rows beginning at the given offset.

type Store

type Store struct{}

Store is a MySQL-backed implementation of Ax's messagestore.GloballyOrderedStore interface.

func (Store) AppendMessages

func (Store) AppendMessages(
	ctx context.Context,
	ptx persistence.Tx,
	stream string,
	offset uint64,
	envs []ax.Envelope,
) error

AppendMessages appends one or more messages to a named stream.

offset is a zero-based index into the stream. An error is returned if offset is not the next unused offset in the stream.

func (Store) OpenGlobal

func (Store) OpenGlobal(
	ctx context.Context,
	ds persistence.DataStore,
	offset uint64,
) (messagestore.Stream, error)

OpenGlobal opens the entire store for reading as a single stream.

The offset may be beyond the end of the stream.

func (Store) OpenStream

func (Store) OpenStream(
	ctx context.Context,
	ds persistence.DataStore,
	stream string,
	offset uint64,
) (messagestore.Stream, bool, error)

OpenStream opens a stream of messages for reading from a specific offset.

The offset may be past the end of the stream. It returns false if the stream does not exist.

type Stream

type Stream struct {
	Fetcher      Fetcher
	NextOffset   uint64
	Limit        uint64
	PollInterval time.Duration
	// contains filtered or unexported fields
}

Stream is a MySQL-backed implementation of Ax's messagestore.Stream interface.

func (*Stream) Close

func (s *Stream) Close() error

Close closes the stream.

func (*Stream) Get

func (s *Stream) Get(ctx context.Context) (ax.Envelope, error)

Get returns the message at the current offset in the stream.

func (*Stream) Next

func (s *Stream) Next(ctx context.Context) error

Next advances the stream to the next message.

It blocks until a message is available, or ctx is canceled.

func (*Stream) Offset

func (s *Stream) Offset() (uint64, error)

Offset returns the offset of the message returned by Get().

func (*Stream) TryNext

func (s *Stream) TryNext(ctx context.Context) (bool, error)

TryNext advances the stream to the next message.

It returns false if there are no more messages in the stream.

type StreamFetcher

type StreamFetcher struct {
	DB       *sql.DB
	StreamID int64
}

StreamFetcher is a fetcher that fetches rows for a specific stream.

func (*StreamFetcher) FetchRows

func (f *StreamFetcher) FetchRows(ctx context.Context, offset, n uint64) (*sql.Rows, error)

FetchRows fetches the n rows beginning at the given offset.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL