flow

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Seq

type Seq struct {
	// contains filtered or unexported fields
}

func Sequential

func Sequential(opt ...glow.NetworkOpt) *Seq

func (*Seq) Capture

func (s *Seq) Capture(cf func(ctx context.Context, in any) error, opt ...StepOpt) *Seq

Capture captures each element in the input data stream and feeds it to a capturing function. The capturing function receives a context and captured data point. Being a terminal step in the pipeline, it does not emit data.

func (*Seq) Collect

func (s *Seq) Collect(cb func([]any), opt ...StepOpt) *Seq

Collect aggregates all elements in the input data stream and provides the collection to the provided callback. It accepts the Compare option to allow sorting the collected data points as they arrive. As a terminal step in the pipeline, it does not emit data, marking the end of the data processing flow.

func (*Seq) Combine

func (s *Seq) Combine(opt ...StepOpt) *Seq

Combine combines the elements of streams into a single stream, concatenating all the data points from the individual streams in the order they arrive.

func (*Seq) Count

func (s *Seq) Count(cb func(num int), opt ...StepOpt) *Seq

Count keeps track of the number of elements in the input data stream. Being a terminal step in the pipeline, it does not emit data.

func (*Seq) Draw

func (s *Seq) Draw(name string) *Seq

Draw generates a Graphviz visualization of the Flow.

func (*Seq) Error

func (s *Seq) Error() error

Error retrieves any error that occurred during the building and execution of the pipeline.

func (*Seq) Filter

func (s *Seq) Filter(ff func(in any) bool, opt ...StepOpt) *Seq

Filter applies a filtering function to each element in the input data stream. The filtering function is invoked with an input element and returns a boolean indicating whether the element should be retained or not. If the filtering function returns true, the element is passed through to the output stream; otherwise, it is discarded. This step serves as an intermediate step facilitating data filtering operations.

func (*Seq) Map

func (s *Seq) Map(mf func(ctx context.Context, in any, emit func(any)) error, opt ...StepOpt) *Seq

Map applies a mapping function to each element in the input data stream. The mapper function is invoked with a context, an input element, and an emit function. It processes each input element and emits zero or more transformed data points using the 'emit' function. The emitted data can be of any type. Typically, this step is an intermediate step enabling data transformation operations.

func (*Seq) Peek

func (s *Seq) Peek(pf func(in any), opt ...StepOpt) *Seq

Peek allows observing the data stream without modifying it, typically for debugging, logging, or monitoring purposes.

func (*Seq) Read

func (s *Seq) Read(rf func(ctx context.Context, emit func(any)) error, opt ...StepOpt) *Seq

Read retrieves data from a specified source using a provided reader function. The reader function is called with a context and an emit function, responsible for reading data and emitting it. The emitted data can be of any type. Usually, this is the first step, feeding data for processing on to subsequent steps.

func (*Seq) Run

func (s *Seq) Run(ctx context.Context) *Seq

Run initiates the processing of data through the pipeline, starting from the initial input source and sequentially applying each operation defined in the pipeline until reaching the terminal step.

func (*Seq) Stop added in v0.0.2

func (s *Seq) Stop() *Seq

func (*Seq) Uptime

func (s *Seq) Uptime(uf func(d time.Duration)) *Seq

type Step

type Step struct {
	// contains filtered or unexported fields
}

type StepKind

type StepKind int
const (
	ReadStep StepKind = iota
	MapStep
	FilterStep
	CaptureStep
	CollectStep
	CountStep
	PeekStep
	CombineStep
)

func (StepKind) String

func (k StepKind) String() string

type StepOpt added in v0.1.0

type StepOpt func(*stepOpts)

func Compare

func Compare(f func(a any, b any) int) StepOpt

Compare function must return

< 0 if a is less than b,
= 0 if a equals b,
> 0 if a is greater than b.

func Concurrency

func Concurrency(v int) StepOpt

Concurrency sets the number of replicas for the Step, determining how many instances of the Step will run concurrently. Depending on whether the preceding Step is in distributing or broadcasting mode, these replicas will either operate in a synchronized manner, collaborating on the same stream of data, or function independently, each handling the full stream.

func Distributor

func Distributor() StepOpt

Distributor enables a Step to distribute work among next Step, and it's replicas. See

  • Concurrency

func StepKey added in v0.2.0

func StepKey(v string) StepOpt

StepKey sets unique key for the Step.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL