metamorphosis

package module
v0.1.0-beta.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

README

Metamorphosis

Inspired by vmware-go-kcl-v2 and kafka-go this is an opinionated library for interacting with kinesis from Go.

Config

Required Fields

GroupID The group the client is a member of. The group id is used as a partition key in Dynamo for related workers. WorkerID WorkerID is the identifier used for shard reservations. StreamARN StreamARN is the kinesis data stream arn used for operations. KinesisClient KinesisClient is used to connect to the AWS kinesis apis. DynamoClient DynamoClient is used to connect to Dynamo and save stream/shard positions.

Optional Fields

ReservationTableName ReservationTableName is the name of the dynamo table that reservation state is stored in. ShardID ShardID will allow a client to explicitly mark the shard it will read from. ReservationTimeout ReservationTimeout indicates when a reservation will be considered expired. RenewTime If set, RenewTime tells the client to start a goroutine to automatically renew reservations. ManagerLoopWaitTime When using the default client, the ManagerLoopWaitTime indicates how long between control loop runs to wait. RecordProcessor When using the default client, the RecordProcessor allows users to process a record and return an error if there is an issue. Logger Logger allows users to pass in a custom slog.Logger ShardCacheDuration ShardCacheDuration determines how often to check for new shards on a dynamic kinesis stream. MaxActorCount Max number of actors (goroutines) that a manger is allowed to instantiate to process shards. WorkerPrefix WorkerPrefix is used in the manager to name the actor(s) WorkerID SleepAfterProcessing if present, SleepAfterProcessing tells the manager to sleep for the duration before getting a new record. Seed Seed is used when multiple clients are trying to reserve shards, Seed is the position to start trying to reserve from.

Options

Basic Client

import (
  "github.com/binarymatt/metamorphosis"
)

func main(){
  client := metamorphosis.NewClient()
}

Default Client

Also called the manager, this client handles retrieval and committing of records.

cfg := metamorphosis.NewConfig()

metamorphosis.New(context.Background(), cfg)

Documentation

Index

Constants

View Source
const (
	GroupIDKey        = "groupID"
	ShardIDKey        = "shardID"
	WorkerIDKey       = "workerID"
	ExpiresAtKey      = "expiresAt"
	LatestSequenceKey = "latestSequence"
)

Variables

View Source
var (
	ErrNotFound          = errors.New("reservation missing")
	ErrShardReserved     = errors.New("shard is already reserved")
	ErrAllShardsReserved = errors.New("all shards are reserved")
	Now                  = time.Now
)
View Source
var (
	ErrMissingReservation = errors.New("missing reservation")
	ErrStreamError        = errors.New("stream error")
)
View Source
var (
	ErrInvalidConfiguration = errors.New("invalid metamorphosis config")
)

Functions

This section is empty.

Types

type API

type API interface {
	CommitRecord(ctx context.Context, record *metamorphosisv1.Record) error
	FetchRecord(ctx context.Context) (*metamorphosisv1.Record, error)
	FetchRecords(ctx context.Context, max int32) ([]*metamorphosisv1.Record, error)
	Init(ctx context.Context) error
	PutRecords(ctx context.Context, request *PutRecordsRequest) error
	CurrentReservation() *Reservation
	ListReservations(ctx context.Context) ([]Reservation, error)
	IsReserved(reservations []Reservation, shard ktypes.Shard) bool
}

type Actor

type Actor struct {
	SleepAfterProcessing time.Duration
	// contains filtered or unexported fields
}

func (*Actor) Work

func (a *Actor) Work(ctx context.Context) error

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(config *Config) *Client

func (*Client) CommitRecord

func (m *Client) CommitRecord(ctx context.Context, record *metamorphosisv1.Record) error

func (*Client) CurrentReservation

func (m *Client) CurrentReservation() *Reservation

func (*Client) FetchRecord

func (m *Client) FetchRecord(ctx context.Context) (*metamorphosisv1.Record, error)

func (*Client) FetchRecords

func (m *Client) FetchRecords(ctx context.Context, max int32) ([]*metamorphosisv1.Record, error)

func (*Client) Init

func (c *Client) Init(ctx context.Context) error

func (*Client) IsReserved

func (m *Client) IsReserved(reservations []Reservation, shard ktypes.Shard) bool

func (*Client) ListReservations

func (m *Client) ListReservations(ctx context.Context) ([]Reservation, error)

func (*Client) PutRecords

func (m *Client) PutRecords(ctx context.Context, req *PutRecordsRequest) error

func (*Client) ReleaseReservation

func (m *Client) ReleaseReservation(ctx context.Context) error

func (*Client) RenewReservation

func (m *Client) RenewReservation(ctx context.Context) error

func (*Client) ReserveShard

func (c *Client) ReserveShard(ctx context.Context) error

type ClientContextKey

type ClientContextKey struct{}

type Config

type Config struct {
	// required fields
	GroupID       string
	WorkerID      string
	StreamARN     string
	KinesisClient KinesisAPI
	DynamoClient  DynamoDBAPI

	// optional fields
	ReservationTableName string
	ShardID              string
	ReservationTimeout   time.Duration
	RenewTime            time.Duration
	ManagerLoopWaitTime  time.Duration
	RecordProcessor      RecordProcessor
	Logger               *slog.Logger
	ShardCacheDuration   time.Duration
	MaxActorCount        int
	WorkerPrefix         string
	SleepAfterProcessing time.Duration
	Seed                 int
	BatchSize            int32
}

Config contains the lower level settings

func NewConfig

func NewConfig(opts ...Option) *Config

func (*Config) Validate

func (c *Config) Validate() error

type DynamoDBAPI

type DynamoDBAPI interface {
	Scan(ctx context.Context, params *dynamodb.ScanInput, optFns ...func(*dynamodb.Options)) (*dynamodb.ScanOutput, error)
	DescribeTable(ctx context.Context, params *dynamodb.DescribeTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DescribeTableOutput, error)
	PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error)
	GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error)
	UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error)
	DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error)
	Query(ctx context.Context, params *dynamodb.QueryInput, optFns ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error)
	CreateTable(ctx context.Context, params *dynamodb.CreateTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.CreateTableOutput, error)
	DeleteTable(ctx context.Context, params *dynamodb.DeleteTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteTableOutput, error)
}

type KinesisAPI

type KinesisAPI interface {
	GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error)
	GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error)
	PutRecords(ctx context.Context, params *kinesis.PutRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordsOutput, error)
	ListShards(ctx context.Context, params *kinesis.ListShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListShardsOutput, error)
	CreateStream(ctx context.Context, params *kinesis.CreateStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.CreateStreamOutput, error)
	DeleteStream(ctx context.Context, params *kinesis.DeleteStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DeleteStreamOutput, error)
	DescribeStream(ctx context.Context, params *kinesis.DescribeStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error)
}

type LoggerContextKey

type LoggerContextKey struct{}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, config *Config) *Manager

func (*Manager) AddActorID

func (m *Manager) AddActorID(id string)

func (*Manager) CheckForAvailableShards

func (m *Manager) CheckForAvailableShards(ctx context.Context) ([]types.Shard, error)

func (*Manager) DecrementActorCount

func (m *Manager) DecrementActorCount()

func (*Manager) IncrementActorCount

func (m *Manager) IncrementActorCount()

func (*Manager) RefreshActorLoop

func (m *Manager) RefreshActorLoop(ctx context.Context) error

func (*Manager) RemoveActorID

func (m *Manager) RemoveActorID(id string)

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

type Option

type Option func(*Config)

func WithBatchSize

func WithBatchSize(size int32) Option

func WithDynamoClient

func WithDynamoClient(client DynamoDBAPI) Option

func WithGroup

func WithGroup(id string) Option

func WithKinesisClient

func WithKinesisClient(client KinesisAPI) Option

func WithLogger

func WithLogger(l *slog.Logger) Option

func WithManagerLoopWaitTime

func WithManagerLoopWaitTime(d time.Duration) Option

func WithMaxActorCount

func WithMaxActorCount(actors int) Option

func WithRecordProcessor

func WithRecordProcessor(p RecordProcessor) Option

func WithRenewTime

func WithRenewTime(d time.Duration) Option

func WithReservationTableName

func WithReservationTableName(name string) Option

func WithReservationTimeout

func WithReservationTimeout(d time.Duration) Option

func WithSeed

func WithSeed(seed int) Option

func WithShardCacheDuration

func WithShardCacheDuration(d time.Duration) Option

func WithShardID

func WithShardID(id string) Option

func WithStreamArn

func WithStreamArn(arn string) Option

func WithWorkerID

func WithWorkerID(id string) Option

func WithWorkerPrefix

func WithWorkerPrefix(id string) Option

type PutRecordsRequest

type PutRecordsRequest struct {
	Records    []*metamorphosisv1.Record
	StreamName *string
	StreamArn  *string
}

type RecordProcessor

type RecordProcessor = func(context.Context, *metamorphosisv1.Record) error

type Reservation

type Reservation struct {
	// primary key
	GroupID string `dynamodbav:"groupID"`
	// secondary key
	ShardID string `dynamodbav:"shardID"`

	WorkerID       string `dynamodbav:"workerID"`
	ExpiresAt      int64  `dynamodbav:"expiresAt"`
	LatestSequence string `dynamodbav:"latestSequence"`
}

func (*Reservation) Expires

func (r *Reservation) Expires() time.Time

Directories

Path Synopsis
docs
gen

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL