manager

package
v0.0.0-...-434faa1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const TearDownDuration = time.Minute * 1

Variables

View Source
var (
	ErrorWriterNotDefined = errors.New("Writer not defined")
	ErrorEmptyTopic       = errors.New("SchemaManager.Topic not set")
)
View Source
var (
	Setup  = Action{Name: "setup", Func: SetupFunc, Help: "setup metadata topics"}
	Reset  = Action{Name: "reset", Func: ResetFunc, Help: "set table schema(s) to the empty schema"}
	Delete = Action{Name: "delete", Func: DeleteFunc, Help: "delete table topic(s)"}
	Purge  = Action{Name: "purge", Func: CleanupFunc, Help: "reset and delete"}

	Destroy = Action{
		Name: "destroy", Func: CleanupFunc,
		Help: "run reset and delete for ALL tables and delete the metadata topics",
	}
)

Functions

func CleanupFunc

func CleanupFunc(ctx context.Context, action *Workflow) error

func DeleteFunc

func DeleteFunc(ctx context.Context, action *Workflow) error

func ResetFunc

func ResetFunc(ctx context.Context, action *Workflow) error

func SetupFunc

func SetupFunc(ctx context.Context, action *Workflow) error

func WithTimeout

func WithTimeout(fn ActionFunc, timeout time.Duration) func(ctx context.Context, wf *Workflow) error

Types

type Action

type Action struct {
	Name string
	Help string
	Func func(ctx context.Context, wf *Workflow) error
}

func Actions

func Actions() []Action

type ActionFunc

type ActionFunc func(ctx context.Context, wf *Workflow) error

ActionFunc defines the minimal interface to implement a custom Command.Func.

Example Usage:

	cmd := kstoreContext.AddCommand(kstore.Command{
			Name: "demo",
			Func: kstore.ActionFunc(examples.RunTopicManagement).WithTimeout(*timeout),
	})
    action.AddCommand(ctx, cmd)

type OnError

type OnError int

OnError specifies how to handle multiple errors.

const (
	OnErrorStop OnError = iota
	OnErrorDefer
	OnErrorIgnore
)

type SchemaManager

type SchemaManager struct {
	// contains filtered or unexported fields
}

func NewSchemaManager

func NewSchemaManager(schemasTopic string, client api.Client) *SchemaManager

func (*SchemaManager) Client

func (tm *SchemaManager) Client() api.Client

func (*SchemaManager) CreateOrUpdateTable

func (tm *SchemaManager) CreateOrUpdateTable(ctx context.Context, schema *kschema.Schema) error

CreateOrUpdateTable creates a table topic (if needed) and updates the table schema.

func (*SchemaManager) DeleteTable

func (tm *SchemaManager) DeleteTable(ctx context.Context, schema *kschema.Schema) error

DeleteTable resets the table schema and deletes the table topic.

func (*SchemaManager) DeleteTopic

func (tm *SchemaManager) DeleteTopic(ctx context.Context, name string) error

DeleteTopic deletes a topic.

func (*SchemaManager) GetSchema

func (tm *SchemaManager) GetSchema(ctx context.Context, table kschema.TableTopic) (kschema.FieldSchema, error)

GetSchema returns the stored schema for the given TableTopic.

func (*SchemaManager) PurgeTable

func (tm *SchemaManager) PurgeTable(ctx context.Context, schema *kschema.Schema) error

PurgeTable clears the Schema for the given table and deletes the table topic.

func (*SchemaManager) ResetTable

func (tm *SchemaManager) ResetTable(ctx context.Context, schema *kschema.Schema) error

ResetTable clears teh Schema for the given topic.

func (*SchemaManager) Setup

func (tm *SchemaManager) Setup(ctx context.Context) error

Setup must be called to initialize the SchemaManager and setup the TablesInfo topic in Kafka.

func (*SchemaManager) Validate

func (tm *SchemaManager) Validate() error

Validate checks the SchemaManager setup for correctness.

type Workflow

type Workflow struct {
	DryRun bool
	// contains filtered or unexported fields
}

func NewWorkflow

func NewWorkflow(tm *SchemaManager, keyFile *config.KeyFile, actions []Action, program []string) (*Workflow, error)

func (*Workflow) AddStep

func (a *Workflow) AddStep(cmd ...Action)

func (*Workflow) Client

func (a *Workflow) Client() api.Client

func (*Workflow) Close

func (a *Workflow) Close() error

func (*Workflow) DeferStep

func (a *Workflow) DeferStep(cmd ...Action)

func (*Workflow) GetFunc

func (a *Workflow) GetFunc(name string) (ActionFunc, error)

func (*Workflow) KeyFile

func (a *Workflow) KeyFile() *config.KeyFile

func (*Workflow) Run

func (a *Workflow) Run(ctx context.Context, onError OnError) (result error)

func (*Workflow) SchemaManager

func (a *Workflow) SchemaManager() *SchemaManager

func (*Workflow) ServerAddress

func (a *Workflow) ServerAddress() string

func (*Workflow) SetFunc

func (a *Workflow) SetFunc(name string, fn ActionFunc) error

func (*Workflow) SetProgram

func (wf *Workflow) SetProgram(steps []string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL