pipeline

package
v0.0.0-...-c79ad69 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2024 License: BSD-3-Clause Imports: 8 Imported by: 1

Documentation

Overview

Package pipeline provides means to construct and execute parallel pipelines.

A Pipeline feeds batches of data through several functions that can be specified to be executed in encounter order, in arbitrary sequential order, or in parallel. Ordered, sequential, or parallel stages can arbitrarily alternate.

A Pipeline consists of a Source object, and several Node objects.

Source objects that are supported by this implementation are arrays, slices, strings, channels, and bufio.Scanner objects, but other kinds of Source objects can be added by user programs.

Node objects can be specified to receive batches from the input source either sequentially in encounter order, which is always the same order in which they were originally encountered at the source; sequentially, but in arbitrary order; or in parallel. Ordered nodes always receive batches in encounter order even if they are preceded by arbitrary sequential, or even parallel nodes.

Node objects consist of filters, which are pairs of receiver and finalizer functions. Each batch is passed to each receiver function, which can transform and modify the batch for the next receiver function in the pipeline. Each finalizer function is called once when all batches have been passed through all receiver functions.

Pipelines do not have an explicit representation for sinks. Instead, filters can use side effects to generate results.

Pipelines also support cancelation by way of the context package of Go's standard library.

Type parameter support for pipelines is preliminary. At the moment, parameters passed from one pipeline stage to the next have to be all of the same type. The goal is to change this in the future, but this will require a redesign of the API in this package. If you need to change parameter types from one stage to the next, you currently need to use a Pipeline[any], or some other suitable interface as a type parameter.

Example (WordCount)
package main

import (
	"bufio"
	"fmt"
	"github.com/intel/forGoParallel/gsync"
	"github.com/intel/forGoParallel/pipeline"
	"github.com/intel/forGoParallel/psort"
	"io"
	"strings"
	"sync/atomic"
)

func WordCount(r io.Reader) *gsync.Map[string, *int64] {
	var result gsync.Map[string, *int64]
	scanner := pipeline.NewScanner(r)
	scanner.Split(bufio.ScanWords)
	p := pipeline.New[[]string](scanner)
	p.Add(
		pipeline.Par(pipeline.Receive(
			func(_ int, data []string) []string {
				var uniqueWords []string
				for _, s := range data {
					count := int64(1)
					if actual, loaded := result.LoadOrStore(s, &count); loaded {
						atomic.AddInt64(actual, 1)
					} else {
						uniqueWords = append(uniqueWords, s)
					}
				}
				return uniqueWords
			},
		)),
	)
	p.Run()
	if err := p.Err(); err != nil {
		panic(err)
	}
	return &result
}

func main() {
	r := strings.NewReader("The big black bug bit the big black bear but the big black bear bit the big black bug back")
	counts := WordCount(r)
	words := make(psort.StringSlice, 0)
	counts.Range(func(key string, _ *int64) bool {
		words = append(words, key)
		return true
	})
	psort.Sort(words)
	for _, word := range words {
		count, _ := counts.Load(word)
		fmt.Println(word, *count)
	}

}
Output:

The 1
back 1
bear 2
big 4
bit 2
black 4
bug 2
but 1
the 3
Example (WordCount2)
package main

import (
	"fmt"
	"github.com/intel/forGoParallel/gsync"
	"github.com/intel/forGoParallel/pipeline"
	"github.com/intel/forGoParallel/psort"
	"sync/atomic"
)

func WordCount2(text []string) *gsync.Map[string, *int64] {
	var result gsync.Map[string, *int64]
	p := pipeline.New[[]string](pipeline.NewSlice(text))
	p.Add(
		pipeline.Par(pipeline.Receive(
			func(_ int, data []string) []string {
				var uniqueWords []string
				for _, s := range data {
					count := int64(1)
					if actual, loaded := result.LoadOrStore(s, &count); loaded {
						atomic.AddInt64(actual, 1)
					} else {
						uniqueWords = append(uniqueWords, s)
					}
				}
				return uniqueWords
			},
		)),
	)
	p.Run()
	return &result
}

func main() {
	sentence := []string{"The", "big", "black", "bug", "bit", "the", "big", "black", "bear", "but", "the", "big", "black", "bear", "bit", "the", "big", "black", "bug", "back"}
	counts := WordCount2(sentence)
	words := make(psort.StringSlice, 0)
	counts.Range(func(key string, _ *int64) bool {
		words = append(words, key)
		return true
	})
	psort.StableSort(words)
	for _, word := range words {
		count, _ := counts.Load(word)
		fmt.Println(word, *count)
	}

}
Output:

The 1
back 1
bear 2
big 4
bit 2
black 4
bug 2
but 1
the 3

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ComposeFilters

func ComposeFilters[T any](pipeline *Pipeline[T], kind NodeKind, dataSize *int, filters []Filter[T]) (receivers []Receiver[T], finalizers []Finalizer)

ComposeFilters takes a number of filters, calls them with the given pipeline, kind, and dataSize parameters in order, and appends the returned receivers and finalizers (except for nil values) to the result slices.

ComposeFilters is used in Node implementations. User programs typically do not call ComposeFilters.

func Identity

func Identity[T any](_ *Pipeline[T], _ NodeKind, _ *int) (_ Receiver[T], _ Finalizer)

Identity is a filter that passes data batches through unmodified. This filter will be optimized away in a pipeline, so it does not hurt to add it.

Types

type BytesScanner

type BytesScanner struct {
	*bufio.Scanner
	// contains filtered or unexported fields
}

BytesScanner is a wrapper around bufio.Scanner so it can act as a data source for pipelines. It fetches slices of bytes.

func NewBytesScanner

func NewBytesScanner(r io.Reader) *BytesScanner

NewBytesScanner returns a new Scanner to read from r. The split function defaults to bufio.ScanLines.

func (*BytesScanner) Data

func (src *BytesScanner) Data() [][]byte

Data implements the method of the Source interface.

func (*BytesScanner) Fetch

func (src *BytesScanner) Fetch(n int) (fetched int)

Fetch implements the method of the Source interface.

func (*BytesScanner) Prepare

func (src *BytesScanner) Prepare(_ context.Context) (size int)

Prepare implements the method of the Source interface.

type Chan

type Chan[T any] struct {
	// contains filtered or unexported fields
}

Chan is a source, that accepts and passes through single elements from the input channel.

func NewChan

func NewChan[T any](channel <-chan T) *Chan[T]

NewChan returns a new Chan to read from the given channel.

func (*Chan[T]) Data

func (src *Chan[T]) Data() T

Data implements the method of the Source interface.

func (*Chan[T]) Err

func (src *Chan[T]) Err() error

Err implements the method of the Source interface.

func (*Chan[T]) Fetch

func (src *Chan[T]) Fetch(n int) (fetched int)

Fetch implements the method of the Source interface.

func (*Chan[T]) Prepare

func (src *Chan[T]) Prepare(ctx context.Context) (size int)

Prepare implements the method of the Source interface.

type Filter

type Filter[T any] func(pipeline *Pipeline[T], kind NodeKind, dataSize *int) (Receiver[T], Finalizer)

A Filter is a function that returns a Receiver and a Finalizer to be added to a node. It receives a pipeline, the kind of node it will be added to, and the expected total data size that the receiver will be asked to process.

The dataSize parameter is either positive, in which case it indicates the expected total size of all batches that will eventually be passed to this filter's receiver, or it is negative, in which case the expected size is either unknown or too difficult to determine. The dataSize parameter is a pointer whose contents can be modified by the filter, for example if this filter increases or decreases the total size for subsequent filters, or if this filter can change dataSize from an unknown to a known value, or vice versa, must change it from a known to an unknown value.

Either the receiver or the finalizer or both can be nil, in which case they will not be added to the current node.

func Append

func Append[T any](result *[]T) Filter[[]T]

Append creates a filter that appends all the data batches it sees to the result. The result must represent a settable slice, for example by using the address operator & on a given slice.

func Count

func Count[T any](result *int) Filter[[]T]

Count creates a filter that sets the result pointer to the total size of all data batches it sees.

func Every

func Every[T any](result *bool, cancelWhenKnown bool, predicate Predicate[T]) Filter[T]

Every creates a filter that sets the result pointer to true if the given predicate returns true for every data batch. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns false on a data batch.

func Finalize

func Finalize[T any](finalize Finalizer) Filter[T]

Finalize creates a filter that returns a nil receiver and the given finalizer.

func NotAny

func NotAny[T any](result *bool, cancelWhenKnown bool, predicate Predicate[T]) Filter[T]

NotAny creates a filter that sets the result pointer to true if the given predicate returns false for every data batch. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns true on a data batch.

func NotEvery

func NotEvery[T any](result *bool, cancelWhenKnown bool, predicate Predicate[T]) Filter[T]

NotEvery creates a filter that sets the result pointer to true if the given predicate returns false for at least one of the data batches it is passed. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns false on a data batch.

func Receive

func Receive[T any](receive Receiver[T]) Filter[T]

Receive creates a Filter that returns the given receiver and a nil finalizer.

func ReceiveAndFinalize

func ReceiveAndFinalize[T any](receive Receiver[T], finalize Finalizer) Filter[T]

ReceiveAndFinalize creates a filter that returns the given filter and receiver.

func Some

func Some[T any](result *bool, cancelWhenKnown bool, predicate Predicate[T]) Filter[T]

Some creates a filter that sets the result pointer to true if the given predicate returns true for at least one of the data batches it is passed. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns true on a data batch.

type Finalizer

type Finalizer func()

A Finalizer is called once after the corresponding receiver has been called for all data batches in the current pipeline.

type Func

type Func[T any] struct {
	// contains filtered or unexported fields
}

Func is a generic source that generates data batches by repeatedly calling a function.

func NewFunc

func NewFunc[T any](prepare func(ctx context.Context) (size int), fetch func(size int) (data T, fetched int, err error)) *Func[T]

NewFunc returns a new Func to generate data batches by repeatedly calling fetch.

The prepare parameter is called when the pipeline is started. It returns the total expected size of all data batches. Return -1, or pass a nil function to the prepare parameter, if the total size is unknown or difficult to determine.

The fetch function returns a data batch of the requested size. It returns the size of the data batch that it was actually able to fetch. It returns 0 if there is no more data to be fetched from the source; the pipeline will then make no further attempts to fetch more elements.

The fetch function can also return an error if necessary.

func (*Func[T]) Data

func (f *Func[T]) Data() T

Data implements the method of the Source interface.

func (*Func[T]) Err

func (f *Func[T]) Err() error

Err implements the method of the Source interface.

func (*Func[T]) Fetch

func (f *Func[T]) Fetch(size int) (fetched int)

Fetch implements the method of the Source interface.

func (*Func[T]) Prepare

func (f *Func[T]) Prepare(ctx context.Context) int

Prepare implements the method of the Source interface.

type MultiChan

type MultiChan[T any] struct {
	// contains filtered or unexported fields
}

MultiChan is a source, that accepts and passes through multiple elements from the input channel.

func NewMultiChan

func NewMultiChan[T any](channel <-chan T) *MultiChan[T]

NewMultiChan returns a new MultiChan to read from the given channel.

func (*MultiChan[T]) Data

func (src *MultiChan[T]) Data() []T

Data implements the method of the Source interface.

func (*MultiChan[T]) Err

func (src *MultiChan[T]) Err() error

Err implements the method of the Source interface.

func (*MultiChan[T]) Fetch

func (src *MultiChan[T]) Fetch(n int) (fetched int)

Fetch implements the method of the Source interface.

func (*MultiChan[T]) Prepare

func (src *MultiChan[T]) Prepare(ctx context.Context) (size int)

Prepare implements the method of the Source interface.

type Node

type Node[T any] interface {

	// TryMerge tries to merge node with the current node by appending its
	// filters to the filters of the current node, which succeeds if both nodes
	// are either sequential or parallel. The return value merged indicates
	// whether merging succeeded.
	TryMerge(node Node[T]) (merged bool)

	// Begin informs this node that the pipeline is going to start to feed
	// batches of data to this node. The pipeline, the index of this node among
	// all the nodes in the pipeline, and the expected total size of all batches
	// combined are passed as parameters.
	//
	// The dataSize parameter is either positive, in which case it indicates the
	// expected total size of all batches that will eventually be passed to this
	// node's Feed method, or it is negative, in which case the expected size is
	// either unknown or too difficult to determine. The dataSize parameter is a
	// pointer whose contents can be modified by Begin, for example if this node
	// increases or decreases the total size for subsequent nodes, or if this
	// node can change dataSize from an unknown to a known value, or vice versa,
	// must change it from a known to an unknown value.
	//
	// A node may decide that, based on the given information, it will actually
	// not need to see any of the batches that are normally going to be passed
	// to it. In that case, it can return false as a result, and its Feed and
	// End method will not be called anymore.  Otherwise, it should return true
	// by default.
	Begin(p *Pipeline[T], index int, dataSize *int) (keep bool)

	// StrictOrd reports whether this node or any contained nodes are StrictOrd
	// nodes.
	StrictOrd() bool

	// Feed is called for each batch of data. The pipeline, the index of this
	// node among all the nodes in the pipeline (which may be different from the
	// index number seen by Begin), the sequence number of the batch (according
	// to the encounter order), and the actual batch of data are passed as
	// parameters.
	//
	// The data parameter contains the batch of data, which is usually a slice
	// of a particular type. After the data has been processed by all filters of
	// this node, the node must call p.FeedForward with exactly the same index
	// and sequence numbers, but a potentially modified batch of data.
	// FeedForward must be called even when the data batch is or becomes empty,
	// to ensure that all sequence numbers are seen by subsequent nodes.
	Feed(p *Pipeline[T], index int, seqNo int, data T)

	// End is called after all batches have been passed to Feed. This allows the
	// node to release resources and call the finalizers of its filters.
	End()
}

A Node object represents a sequence of filters which are together executed either in encounter order, in arbitrary sequential order, or in parallel.

The methods of this interface are typically not called by user programs, but rather implemented by specific node types and called by pipelines. Ordered, sequential, and parallel nodes are also implemented in this package, so that user programs are typically not concerned with Node methods at all.

func Limit

func Limit[T any](limit int, cancelWhenReached bool) Node[[]T]

Limit creates an ordered node with a filter that caps the total size of all data batches it passes to the next filter in the pipeline to the given limit. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the limit is reached. If limit is negative, all data is passed through unmodified.

func LimitedPar

func LimitedPar[T any](filters ...Filter[T]) Node[T]

LimitedPar creates a parallel node with the given filters.

func Ord

func Ord[T any](filters ...Filter[T]) Node[T]

Ord creates an ordered node with the given filters.

func Par

func Par[T any](filters ...Filter[T]) Node[T]

Par creates a parallel node with the given filters.

func Seq

func Seq[T any](filters ...Filter[T]) Node[T]

Seq creates a sequential node with the given filters.

func Skip

func Skip[T any](n int) Node[[]T]

Skip creates an ordered node with a filter that skips the first n elements from the data batches it passes to the next filter in the pipeline. If n is negative, no data is passed through, and the error value of the pipeline is set to a non-nil value.

func StrictOrd

func StrictOrd[T any](filters ...Filter[T]) Node[T]

StrictOrd creates an ordered node with the given filters.

type NodeKind

type NodeKind int

A NodeKind reperesents the different kinds of nodes.

const (
	// Ordered nodes receive batches in encounter order.
	Ordered NodeKind = iota

	// Sequential nodes receive batches in arbitrary sequential order.
	Sequential

	// Parallel nodes receives batches in parallel.
	Parallel
)

type Pipeline

type Pipeline[T any] struct {
	// contains filtered or unexported fields
}

A Pipeline is a parallel pipeline that can feed batches of data fetched from a source through several nodes that are ordered, sequential, or parallel.

func LimitedParTransform

func LimitedParTransform[S, T any](p *Pipeline[S], transform func(int, S) T) *Pipeline[T]

func New

func New[T any](source Source[T]) *Pipeline[T]

New creates a new pipeline with the given source.

func OrdTransform

func OrdTransform[S, T any](p *Pipeline[S], transform func(int, S) T) *Pipeline[T]

func ParTransform

func ParTransform[S, T any](p *Pipeline[S], transform func(int, S) T) *Pipeline[T]

func SeqTransform

func SeqTransform[S, T any](p *Pipeline[S], transform func(int, S) T) *Pipeline[T]

func StrictOrdTransform

func StrictOrdTransform[S, T any](p *Pipeline[S], transform func(int, S) T) *Pipeline[T]

func Transform

func Transform[S, T any](p *Pipeline[S], transform func(int, S) T) *Pipeline[T]

func (*Pipeline[T]) Add

func (p *Pipeline[T]) Add(nodes ...Node[T])

Add appends nodes to the end of this pipeline.

func (*Pipeline[T]) Cancel

func (p *Pipeline[T]) Cancel()

Cancel calls the cancel function of this pipeline's context.

func (*Pipeline[T]) Context

func (p *Pipeline[T]) Context() context.Context

Context returns this pipeline's context.

func (*Pipeline[T]) Err

func (p *Pipeline[T]) Err() (err error)

Err returns the current error value for this pipeline, which may be nil if no error has occurred so far.

Err and SetErr are safe to be concurrently invoked.

func (*Pipeline[T]) FeedForward

func (p *Pipeline[T]) FeedForward(index int, seqNo int, data T)

FeedForward must be called in the Feed method of a node to forward a potentially modified data batch to the next node in the current pipeline.

FeedForward is used in Node implementations. User programs typically do not call FeedForward.

FeedForward must be called with the pipeline received as a parameter by Feed, and must pass the same index and seqNo received by Feed. The data parameter can be either a modified or an unmodified data batch. FeedForward must always be called, even if the data batch is unmodified, and even if the data batch is or becomes empty.

func (*Pipeline[T]) NofBatches

func (p *Pipeline[T]) NofBatches(n int) (nofBatches int)

NofBatches sets or gets the number of batches that are created from the data source for this pipeline, if the expected total size for this pipeline's data source is known or can be determined easily.

NofBatches can be called safely by user programs before Run or RunWithContext is called.

If user programs do not call NofBatches, or call them with a value < 1, then the pipeline will choose a reasonable default value that takes runtime.NumCPU() into account.

If the expected total size for this pipeline's data source is unknown, or is difficult to determine, use SetVariableBatchSize to influence batch sizes.

func (*Pipeline[T]) Notify

func (p *Pipeline[T]) Notify(f func())

Notify installs a thunk that gets invoked just before this pipeline starts running. f will be invoked in its own goroutine.

func (*Pipeline[T]) Run

func (p *Pipeline[T]) Run()

Run initiates pipeline execution by calling RunWithContext(context.WithCancel(context.Background())), and ensures that the cancel function is called at least once when the pipeline is done.

Run should only be called after a data source has been set using the Source method, and one or more Node objects have been added to the pipeline using the Add method. NofBatches can be called before Run to deal with load imbalance, but this is not necessary since Run chooses a reasonable default value.

Run prepares the data source, tells each node that batches are going to be sent to them by calling Begin, and then fetches batches from the data source and sends them to the nodes. Once the data source is depleted, the nodes are informed that the end of the data source has been reached.

func (*Pipeline[T]) RunWithContext

func (p *Pipeline[T]) RunWithContext(ctx context.Context, cancel context.CancelFunc)

RunWithContext initiates pipeline execution.

It expects a context and a cancel function as parameters, for example from context.WithCancel(context.Background()). It does not ensure that the cancel function is called at least once, so this must be ensured by the function calling RunWithContext.

RunWithContext should only be called after a data source has been set using the Source method, and one or more Node objects have been added to the pipeline using the Add method. NofBatches can be called before RunWithContext to deal with load imbalance, but this is not necessary since RunWithContext chooses a reasonable default value.

RunWithContext prepares the data source, tells each node that batches are going to be sent to them by calling Begin, and then fetches batches from the data source and sends them to the nodes. Once the data source is depleted, the nodes are informed that the end of the data source has been reached.

func (*Pipeline[T]) SetErr

func (p *Pipeline[T]) SetErr(err error) bool

SetErr attempts to set a new error value for this pipeline, unless it already has a non-nil error value. If the attempt is successful, SetErr also cancels the pipeline, and returns true. If the attempt is not successful, SetErr returns false.

SetErr and Err are safe to be concurrently invoked, for example from the different goroutines executing filters of parallel nodes in this pipeline.

func (*Pipeline[T]) SetVariableBatchSize

func (p *Pipeline[T]) SetVariableBatchSize(batchInc, maxBatchSize int)

SetVariableBatchSize sets the batch size(s) for the batches that are created from the data source for this pipeline, if the expected total size for this pipeline's data source is unknown or difficult to determine.

SetVariableBatchSize can be called safely by user programs before Run or RunWithContext is called.

If user programs do not call SetVariableBatchSize, or pass a value < 1 to any of the two parameters, then the pipeline will choose a reasonable default value for that respective parameter.

The pipeline will start with batchInc as a batch size, and increase the batch size for every subsequent batch by batchInc to accomodate data sources of different total sizes. The batch size will never be larger than maxBatchSize, though.

If the expected total size for this pipeline's data source is known, or can be determined easily, use NofBatches to influence the batch size.

type Predicate

type Predicate[T any] func(data T) bool

A Predicate is a function that is passed a data batch and returns a boolean value.

In most cases, it will cast the data parameter to a specific slice type and check a predicate on each element of the slice.

type Receiver

type Receiver[T any] func(seqNo int, data T) (filteredData T)

A Receiver is called for every data batch, and returns a potentially modified data batch. The seqNo parameter indicates the order in which the data batch was encountered at the current pipeline's data source.

type Scanner

type Scanner struct {
	*bufio.Scanner
	// contains filtered or unexported fields
}

Scanner is a wrapper around bufio.Scanner so it can act as a data source for pipelines. It fetches strings.

func NewScanner

func NewScanner(r io.Reader) *Scanner

NewScanner returns a new Scanner to read from r. The split function defaults to bufio.ScanLines.

func (*Scanner) Data

func (src *Scanner) Data() []string

Data implements the method of the Source interface.

func (*Scanner) Fetch

func (src *Scanner) Fetch(n int) (fetched int)

Fetch implements the method of the Source interface.

func (*Scanner) Prepare

func (src *Scanner) Prepare(_ context.Context) (size int)

Prepare implements the method of the Source interface.

type Slice

type Slice[T any] struct {
	// contains filtered or unexported fields
}

func NewSlice

func NewSlice[T any](value []T) *Slice[T]

func (*Slice[T]) Data

func (src *Slice[T]) Data() []T

func (*Slice[T]) Err

func (src *Slice[T]) Err() error

func (*Slice[T]) Fetch

func (src *Slice[T]) Fetch(n int) (fetched int)

func (*Slice[T]) Prepare

func (src *Slice[T]) Prepare(_ context.Context) int

type Source

type Source[T any] interface {
	// Err returns an error value or nil
	Err() error

	// Prepare receives a pipeline context and informs the pipeline what the
	// total expected size of all data batches is. The return value is -1 if the
	// total size is unknown or difficult to determine.
	Prepare(ctx context.Context) (size int)

	// Fetch gets a data batch of the requested size from the source. It returns
	// the size of the data batch that it was actually able to fetch. It returns
	// 0 if there is no more data to be fetched from the source; the pipeline
	// will then make no further attempts to fetch more elements.
	Fetch(size int) (fetched int)

	// Data returns the last fetched data batch.
	Data() T
}

A Source represents an object that can generate data batches for pipelines.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL