batch

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2023 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package batch contains internal utilities for interacting with message batches.

Package batch contains internal utilities for interacting with message batches.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CollapsedCount

func CollapsedCount(p *message.Part) int

CollapsedCount attempts to extract the actual number of messages that were collapsed into the resulting message part. This value could be greater than 1 when users configure processors that archive batched message parts.

func CtxCollapsedCount

func CtxCollapsedCount(ctx context.Context) int

CtxCollapsedCount attempts to extract the actual number of messages that were collapsed into the resulting message part. This value could be greater than 1 when users configure processors that archive batched message parts.

func CtxWithCollapsedCount

func CtxWithCollapsedCount(ctx context.Context, count int) context.Context

CtxWithCollapsedCount returns a message part with a context indicating that this message is the result of collapsing a number of messages. This allows downstream components to know how many total messages were combined.

func MessageCollapsedCount

func MessageCollapsedCount(m message.Batch) int

MessageCollapsedCount attempts to extract the actual number of messages that were combined into the resulting batched message parts. This value could differ from message.Len() when users configure processors that archive batched message parts.

func WithCollapsedCount

func WithCollapsedCount(p *message.Part, count int) *message.Part

WithCollapsedCount returns a message part with a context indicating that this message is the result of collapsing a number of messages. This allows downstream components to know how many total messages were combined.

Types

type AckFunc

type AckFunc func(context.Context, error) error

AckFunc is a common function signature for acknowledging receipt of messages.

type CombinedAcker

type CombinedAcker struct {
	// contains filtered or unexported fields
}

CombinedAcker creates a single ack func closure that aggregates one or more derived closures such that only once each derived closure is called the singular ack func will trigger. If at least one derived closure receives an error the singular ack func will send the first non-nil error received.

func NewCombinedAcker

func NewCombinedAcker(aFn AckFunc) *CombinedAcker

NewCombinedAcker creates an aggregated that derives one or more ack funcs that, once all of which have been called, the provided root ack func is called.

func (*CombinedAcker) Derive

func (c *CombinedAcker) Derive() AckFunc

Derive creates a new ack func that must be called before the origin ack func will be called. It is invalid to derive an ack func after any other previously derived funcs have been called.

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error is an error type that also allows storing granular errors for each message of a batch.

func NewError

func NewError(msg message.Batch, err error) *Error

NewError creates a new batch-wide error, where it's possible to add granular errors for individual messages of the batch.

func (*Error) Error

func (e *Error) Error() string

Error implements the common error interface.

func (*Error) Failed

func (e *Error) Failed(i int, err error) *Error

Failed stores an error state for a particular message of a batch. Returns a pointer to the underlying error, allowing the method to be chained.

If Failed is not called then all messages are assumed to have failed. If it is called at least once then all message indexes that aren't explicitly failed are assumed to have been processed successfully.

func (*Error) IndexedErrors

func (e *Error) IndexedErrors() int

IndexedErrors returns the number of indexed errors that have been registered for the batch.

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the underlying common error.

func (*Error) WalkPartsBySource added in v1.1.2

func (e *Error) WalkPartsBySource(sourceSortGroup *message.SortGroup, sourceBatch message.Batch, fn func(int, *message.Part, error) bool)

WalkPartsBySource applies a closure to each message that was part of the request that caused this error. The closure is provided the message part index, a pointer to the part, and its individual error, which may be nil if the message itself was processed successfully. The closure returns a bool which indicates whether the iteration should be continued.

Important! The order to parts walked is not guaranteed to match that of the source batch. It is also possible for any given index to be represented zero, one or more times.

func (*Error) WalkPartsNaively added in v1.1.2

func (e *Error) WalkPartsNaively(fn func(int, *message.Part, error) bool)

WalkPartsNaively applies a closure to each message that was part of the request that caused this error. The closure is provided the message part index, a pointer to the part, and its individual error, which may be nil if the message itself was processed successfully. The closure returns a bool which indicates whether the iteration should be continued.

WARNING: The shape and order of the errored batch is not guaranteed to match that of an origin batch and therefore cannot be used to associate batch errors with the origin. Instead, use WalkPartsBySource.

func (*Error) XErroredBatch added in v1.1.0

func (e *Error) XErroredBatch() message.Batch

XErroredBatch returns the underlying batch associated with the error.

Directories

Path Synopsis
Package policy provides tooling for creating and executing Benthos message batch policies.
Package policy provides tooling for creating and executing Benthos message batch policies.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL