Versions in this module Expand all Collapse all v4 v4.22.0 Oct 23, 2023 Changes in this version + var ErrBrokerNoInputs = errors.New("attempting to create broker input type with no inputs") + var ErrBrokerNoOutputs = errors.New("attempting to create broker output type with no outputs") + var ErrSwitchNoCasesMatched = errors.New("no switch cases were matched by message") + var ErrSwitchNoConditionMet = errors.New("no switch output conditions were met by message") + var ErrSwitchNoOutputs = errors.New("attempting to create switch with fewer than 2 cases") + func AddCompressFunc(name string, fn CompressFunc) struct + func AddDecompressFunc(name string, fn DecompressFunc) struct + func CommonRetryBackOffCtorFromParsed(pConf *service.ParsedConfig) (ctor func() backoff.BackOff, err error) + func CommonRetryBackOffFields(defaultMaxRetries int, defaultInitInterval string, defaultMaxInterval string, ...) []*service.ConfigField + func RetryOutputIndefinitely(mgr bundle.NewManagement, wrapped output.Streamed) (output.Streamed, error) + func SwitchReorderFromGroup(group *message.SortGroup, parts []*message.Part) + type Branch struct + func (b *Branch) Close(ctx context.Context) error + func (b *Branch) ProcessBatch(ctx context.Context, batch message.Batch) ([]message.Batch, error) + type CacheWriter struct + func NewCacheWriter(conf output.CacheConfig, mgr bundle.NewManagement, log log.Modular) (*CacheWriter, error) + func (c *CacheWriter) Close(context.Context) error + func (c *CacheWriter) Connect(ctx context.Context) error + func (c *CacheWriter) WriteBatch(ctx context.Context, msg message.Batch) (err error) + type CompressFunc func(level int, bytes []byte) ([]byte, error) + type DecompressFunc func(bytes []byte) ([]byte, error) + type SyncResponseWriter struct + func (s SyncResponseWriter) Close(context.Context) error + func (s SyncResponseWriter) Connect(ctx context.Context) error + func (s SyncResponseWriter) WriteBatch(ctx context.Context, msg message.Batch) error + type Workflow struct + func NewWorkflow(conf processor.WorkflowConfig, mgr bundle.NewManagement) (*Workflow, error) + func (w *Workflow) Close(ctx context.Context) error + func (w *Workflow) Flow() [][]string + func (w *Workflow) ProcessBatch(ctx context.Context, msg message.Batch) ([]message.Batch, error)