Documentation ¶
Overview ¶
Package pipeline provides means to construct and execute parallel pipelines.
A Pipeline feeds batches of data through several functions that can be specified to be executed in encounter order, in arbitrary sequential order, or in parallel. Ordered, sequential, or parallel stages can arbitrarily alternate.
A Pipeline consists of a Source object, and several Node objects.
Source objects that are supported by this implementation are arrays, slices, strings, channels, and bufio.Scanner objects, but other kinds of Source objects can be added by user programs.
Node objects can be specified to receive batches from the input source either sequentially in encounter order, which is always the same order in which they were originally encountered at the source; sequentially, but in arbitrary order; or in parallel. Ordered nodes always receive batches in encounter order even if they are preceded by arbitrary sequential, or even parallel nodes.
Node objects consist of filters, which are pairs of receiver and finalizer functions. Each batch is passed to each receiver function, which can transform and modify the batch for the next receiver function in the pipeline. Each finalizer function is called once when all batches have been passed through all receiver functions.
Pipelines do not have an explicit representation for sinks. Instead, filters can use side effects to generate results.
Pipelines also support cancelation by way of the context package of Go's standard library.
Type parameter support for pipelines is preliminary. At the moment, parameters passed from one pipeline stage to the next have to be all of the same type. The goal is to change this in the future, but this will require a redesign of the API in this package. If you need to change parameter types from one stage to the next, you currently need to use a Pipeline[any], or some other suitable interface as a type parameter.
Example (WordCount) ¶
package main import ( "bufio" "fmt" "github.com/intel/forGoParallel/gsync" "github.com/intel/forGoParallel/pipeline" "github.com/intel/forGoParallel/psort" "io" "strings" "sync/atomic" ) func WordCount(r io.Reader) *gsync.Map[string, *int64] { var result gsync.Map[string, *int64] scanner := pipeline.NewScanner(r) scanner.Split(bufio.ScanWords) p := pipeline.New[[]string](scanner) p.Add( pipeline.Par(pipeline.Receive( func(_ int, data []string) []string { var uniqueWords []string for _, s := range data { count := int64(1) if actual, loaded := result.LoadOrStore(s, &count); loaded { atomic.AddInt64(actual, 1) } else { uniqueWords = append(uniqueWords, s) } } return uniqueWords }, )), ) p.Run() if err := p.Err(); err != nil { panic(err) } return &result } func main() { r := strings.NewReader("The big black bug bit the big black bear but the big black bear bit the big black bug back") counts := WordCount(r) words := make(psort.StringSlice, 0) counts.Range(func(key string, _ *int64) bool { words = append(words, key) return true }) psort.Sort(words) for _, word := range words { count, _ := counts.Load(word) fmt.Println(word, *count) } }
Output: The 1 back 1 bear 2 big 4 bit 2 black 4 bug 2 but 1 the 3
Example (WordCount2) ¶
package main import ( "fmt" "github.com/intel/forGoParallel/gsync" "github.com/intel/forGoParallel/pipeline" "github.com/intel/forGoParallel/psort" "sync/atomic" ) func WordCount2(text []string) *gsync.Map[string, *int64] { var result gsync.Map[string, *int64] p := pipeline.New[[]string](pipeline.NewSlice(text)) p.Add( pipeline.Par(pipeline.Receive( func(_ int, data []string) []string { var uniqueWords []string for _, s := range data { count := int64(1) if actual, loaded := result.LoadOrStore(s, &count); loaded { atomic.AddInt64(actual, 1) } else { uniqueWords = append(uniqueWords, s) } } return uniqueWords }, )), ) p.Run() return &result } func main() { sentence := []string{"The", "big", "black", "bug", "bit", "the", "big", "black", "bear", "but", "the", "big", "black", "bear", "bit", "the", "big", "black", "bug", "back"} counts := WordCount2(sentence) words := make(psort.StringSlice, 0) counts.Range(func(key string, _ *int64) bool { words = append(words, key) return true }) psort.StableSort(words) for _, word := range words { count, _ := counts.Load(word) fmt.Println(word, *count) } }
Output: The 1 back 1 bear 2 big 4 bit 2 black 4 bug 2 but 1 the 3
Index ¶
- func ComposeFilters[T any](pipeline *Pipeline[T], kind NodeKind, dataSize *int, filters []Filter[T]) (receivers []Receiver[T], finalizers []Finalizer)
- func Identity[T any](_ *Pipeline[T], _ NodeKind, _ *int) (_ Receiver[T], _ Finalizer)
- type BytesScanner
- type Chan
- type Filter
- func Append[T any](result *[]T) Filter[[]T]
- func Count[T any](result *int) Filter[[]T]
- func Every[T any](result *bool, cancelWhenKnown bool, predicate Predicate[T]) Filter[T]
- func Finalize[T any](finalize Finalizer) Filter[T]
- func NotAny[T any](result *bool, cancelWhenKnown bool, predicate Predicate[T]) Filter[T]
- func NotEvery[T any](result *bool, cancelWhenKnown bool, predicate Predicate[T]) Filter[T]
- func Receive[T any](receive Receiver[T]) Filter[T]
- func ReceiveAndFinalize[T any](receive Receiver[T], finalize Finalizer) Filter[T]
- func Some[T any](result *bool, cancelWhenKnown bool, predicate Predicate[T]) Filter[T]
- type Finalizer
- type Func
- type MultiChan
- type Node
- func Limit[T any](limit int, cancelWhenReached bool) Node[[]T]
- func LimitedPar[T any](filters ...Filter[T]) Node[T]
- func Ord[T any](filters ...Filter[T]) Node[T]
- func Par[T any](filters ...Filter[T]) Node[T]
- func Seq[T any](filters ...Filter[T]) Node[T]
- func Skip[T any](n int) Node[[]T]
- func StrictOrd[T any](filters ...Filter[T]) Node[T]
- type NodeKind
- type Pipeline
- func LimitedParTransform[S, T any](p *Pipeline[S], transform func(int, S) T) *Pipeline[T]
- func New[T any](source Source[T]) *Pipeline[T]
- func OrdTransform[S, T any](p *Pipeline[S], transform func(int, S) T) *Pipeline[T]
- func ParTransform[S, T any](p *Pipeline[S], transform func(int, S) T) *Pipeline[T]
- func SeqTransform[S, T any](p *Pipeline[S], transform func(int, S) T) *Pipeline[T]
- func StrictOrdTransform[S, T any](p *Pipeline[S], transform func(int, S) T) *Pipeline[T]
- func Transform[S, T any](p *Pipeline[S], transform func(int, S) T) *Pipeline[T]
- func (p *Pipeline[T]) Add(nodes ...Node[T])
- func (p *Pipeline[T]) Cancel()
- func (p *Pipeline[T]) Context() context.Context
- func (p *Pipeline[T]) Err() (err error)
- func (p *Pipeline[T]) FeedForward(index int, seqNo int, data T)
- func (p *Pipeline[T]) NofBatches(n int) (nofBatches int)
- func (p *Pipeline[T]) Notify(f func())
- func (p *Pipeline[T]) Run()
- func (p *Pipeline[T]) RunWithContext(ctx context.Context, cancel context.CancelFunc)
- func (p *Pipeline[T]) SetErr(err error) bool
- func (p *Pipeline[T]) SetVariableBatchSize(batchInc, maxBatchSize int)
- type Predicate
- type Receiver
- type Scanner
- type Slice
- type Source
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ComposeFilters ¶
func ComposeFilters[T any](pipeline *Pipeline[T], kind NodeKind, dataSize *int, filters []Filter[T]) (receivers []Receiver[T], finalizers []Finalizer)
ComposeFilters takes a number of filters, calls them with the given pipeline, kind, and dataSize parameters in order, and appends the returned receivers and finalizers (except for nil values) to the result slices.
ComposeFilters is used in Node implementations. User programs typically do not call ComposeFilters.
Types ¶
type BytesScanner ¶
BytesScanner is a wrapper around bufio.Scanner so it can act as a data source for pipelines. It fetches slices of bytes.
func NewBytesScanner ¶
func NewBytesScanner(r io.Reader) *BytesScanner
NewBytesScanner returns a new Scanner to read from r. The split function defaults to bufio.ScanLines.
func (*BytesScanner) Data ¶
func (src *BytesScanner) Data() [][]byte
Data implements the method of the Source interface.
func (*BytesScanner) Fetch ¶
func (src *BytesScanner) Fetch(n int) (fetched int)
Fetch implements the method of the Source interface.
type Chan ¶
type Chan[T any] struct { // contains filtered or unexported fields }
Chan is a source, that accepts and passes through single elements from the input channel.
func (*Chan[T]) Data ¶
func (src *Chan[T]) Data() T
Data implements the method of the Source interface.
type Filter ¶
type Filter[T any] func(pipeline *Pipeline[T], kind NodeKind, dataSize *int) (Receiver[T], Finalizer)
A Filter is a function that returns a Receiver and a Finalizer to be added to a node. It receives a pipeline, the kind of node it will be added to, and the expected total data size that the receiver will be asked to process.
The dataSize parameter is either positive, in which case it indicates the expected total size of all batches that will eventually be passed to this filter's receiver, or it is negative, in which case the expected size is either unknown or too difficult to determine. The dataSize parameter is a pointer whose contents can be modified by the filter, for example if this filter increases or decreases the total size for subsequent filters, or if this filter can change dataSize from an unknown to a known value, or vice versa, must change it from a known to an unknown value.
Either the receiver or the finalizer or both can be nil, in which case they will not be added to the current node.
func Append ¶
Append creates a filter that appends all the data batches it sees to the result. The result must represent a settable slice, for example by using the address operator & on a given slice.
func Count ¶
Count creates a filter that sets the result pointer to the total size of all data batches it sees.
func Every ¶
Every creates a filter that sets the result pointer to true if the given predicate returns true for every data batch. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns false on a data batch.
func NotAny ¶
NotAny creates a filter that sets the result pointer to true if the given predicate returns false for every data batch. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns true on a data batch.
func NotEvery ¶
NotEvery creates a filter that sets the result pointer to true if the given predicate returns false for at least one of the data batches it is passed. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the predicate returns false on a data batch.
func ReceiveAndFinalize ¶
ReceiveAndFinalize creates a filter that returns the given filter and receiver.
type Finalizer ¶
type Finalizer func()
A Finalizer is called once after the corresponding receiver has been called for all data batches in the current pipeline.
type Func ¶
type Func[T any] struct { // contains filtered or unexported fields }
Func is a generic source that generates data batches by repeatedly calling a function.
func NewFunc ¶
func NewFunc[T any](prepare func(ctx context.Context) (size int), fetch func(size int) (data T, fetched int, err error)) *Func[T]
NewFunc returns a new Func to generate data batches by repeatedly calling fetch.
The prepare parameter is called when the pipeline is started. It returns the total expected size of all data batches. Return -1, or pass a nil function to the prepare parameter, if the total size is unknown or difficult to determine.
The fetch function returns a data batch of the requested size. It returns the size of the data batch that it was actually able to fetch. It returns 0 if there is no more data to be fetched from the source; the pipeline will then make no further attempts to fetch more elements.
The fetch function can also return an error if necessary.
func (*Func[T]) Data ¶
func (f *Func[T]) Data() T
Data implements the method of the Source interface.
type MultiChan ¶
type MultiChan[T any] struct { // contains filtered or unexported fields }
MultiChan is a source, that accepts and passes through multiple elements from the input channel.
func NewMultiChan ¶
NewMultiChan returns a new MultiChan to read from the given channel.
func (*MultiChan[T]) Data ¶
func (src *MultiChan[T]) Data() []T
Data implements the method of the Source interface.
type Node ¶
type Node[T any] interface { // TryMerge tries to merge node with the current node by appending its // filters to the filters of the current node, which succeeds if both nodes // are either sequential or parallel. The return value merged indicates // whether merging succeeded. TryMerge(node Node[T]) (merged bool) // Begin informs this node that the pipeline is going to start to feed // batches of data to this node. The pipeline, the index of this node among // all the nodes in the pipeline, and the expected total size of all batches // combined are passed as parameters. // // The dataSize parameter is either positive, in which case it indicates the // expected total size of all batches that will eventually be passed to this // node's Feed method, or it is negative, in which case the expected size is // either unknown or too difficult to determine. The dataSize parameter is a // pointer whose contents can be modified by Begin, for example if this node // increases or decreases the total size for subsequent nodes, or if this // node can change dataSize from an unknown to a known value, or vice versa, // must change it from a known to an unknown value. // // A node may decide that, based on the given information, it will actually // not need to see any of the batches that are normally going to be passed // to it. In that case, it can return false as a result, and its Feed and // End method will not be called anymore. Otherwise, it should return true // by default. Begin(p *Pipeline[T], index int, dataSize *int) (keep bool) // StrictOrd reports whether this node or any contained nodes are StrictOrd // nodes. StrictOrd() bool // Feed is called for each batch of data. The pipeline, the index of this // node among all the nodes in the pipeline (which may be different from the // index number seen by Begin), the sequence number of the batch (according // to the encounter order), and the actual batch of data are passed as // parameters. // // The data parameter contains the batch of data, which is usually a slice // of a particular type. After the data has been processed by all filters of // this node, the node must call p.FeedForward with exactly the same index // and sequence numbers, but a potentially modified batch of data. // FeedForward must be called even when the data batch is or becomes empty, // to ensure that all sequence numbers are seen by subsequent nodes. Feed(p *Pipeline[T], index int, seqNo int, data T) // End is called after all batches have been passed to Feed. This allows the // node to release resources and call the finalizers of its filters. End() }
A Node object represents a sequence of filters which are together executed either in encounter order, in arbitrary sequential order, or in parallel.
The methods of this interface are typically not called by user programs, but rather implemented by specific node types and called by pipelines. Ordered, sequential, and parallel nodes are also implemented in this package, so that user programs are typically not concerned with Node methods at all.
func Limit ¶
Limit creates an ordered node with a filter that caps the total size of all data batches it passes to the next filter in the pipeline to the given limit. If cancelWhenKnown is true, this filter cancels the pipeline as soon as the limit is reached. If limit is negative, all data is passed through unmodified.
func LimitedPar ¶
LimitedPar creates a parallel node with the given filters.
type Pipeline ¶
type Pipeline[T any] struct { // contains filtered or unexported fields }
A Pipeline is a parallel pipeline that can feed batches of data fetched from a source through several nodes that are ordered, sequential, or parallel.
func LimitedParTransform ¶
func OrdTransform ¶
func ParTransform ¶
func SeqTransform ¶
func StrictOrdTransform ¶
func (*Pipeline[T]) Cancel ¶
func (p *Pipeline[T]) Cancel()
Cancel calls the cancel function of this pipeline's context.
func (*Pipeline[T]) Err ¶
Err returns the current error value for this pipeline, which may be nil if no error has occurred so far.
Err and SetErr are safe to be concurrently invoked.
func (*Pipeline[T]) FeedForward ¶
FeedForward must be called in the Feed method of a node to forward a potentially modified data batch to the next node in the current pipeline.
FeedForward is used in Node implementations. User programs typically do not call FeedForward.
FeedForward must be called with the pipeline received as a parameter by Feed, and must pass the same index and seqNo received by Feed. The data parameter can be either a modified or an unmodified data batch. FeedForward must always be called, even if the data batch is unmodified, and even if the data batch is or becomes empty.
func (*Pipeline[T]) NofBatches ¶
NofBatches sets or gets the number of batches that are created from the data source for this pipeline, if the expected total size for this pipeline's data source is known or can be determined easily.
NofBatches can be called safely by user programs before Run or RunWithContext is called.
If user programs do not call NofBatches, or call them with a value < 1, then the pipeline will choose a reasonable default value that takes runtime.NumCPU() into account.
If the expected total size for this pipeline's data source is unknown, or is difficult to determine, use SetVariableBatchSize to influence batch sizes.
func (*Pipeline[T]) Notify ¶
func (p *Pipeline[T]) Notify(f func())
Notify installs a thunk that gets invoked just before this pipeline starts running. f will be invoked in its own goroutine.
func (*Pipeline[T]) Run ¶
func (p *Pipeline[T]) Run()
Run initiates pipeline execution by calling RunWithContext(context.WithCancel(context.Background())), and ensures that the cancel function is called at least once when the pipeline is done.
Run should only be called after a data source has been set using the Source method, and one or more Node objects have been added to the pipeline using the Add method. NofBatches can be called before Run to deal with load imbalance, but this is not necessary since Run chooses a reasonable default value.
Run prepares the data source, tells each node that batches are going to be sent to them by calling Begin, and then fetches batches from the data source and sends them to the nodes. Once the data source is depleted, the nodes are informed that the end of the data source has been reached.
func (*Pipeline[T]) RunWithContext ¶
func (p *Pipeline[T]) RunWithContext(ctx context.Context, cancel context.CancelFunc)
RunWithContext initiates pipeline execution.
It expects a context and a cancel function as parameters, for example from context.WithCancel(context.Background()). It does not ensure that the cancel function is called at least once, so this must be ensured by the function calling RunWithContext.
RunWithContext should only be called after a data source has been set using the Source method, and one or more Node objects have been added to the pipeline using the Add method. NofBatches can be called before RunWithContext to deal with load imbalance, but this is not necessary since RunWithContext chooses a reasonable default value.
RunWithContext prepares the data source, tells each node that batches are going to be sent to them by calling Begin, and then fetches batches from the data source and sends them to the nodes. Once the data source is depleted, the nodes are informed that the end of the data source has been reached.
func (*Pipeline[T]) SetErr ¶
SetErr attempts to set a new error value for this pipeline, unless it already has a non-nil error value. If the attempt is successful, SetErr also cancels the pipeline, and returns true. If the attempt is not successful, SetErr returns false.
SetErr and Err are safe to be concurrently invoked, for example from the different goroutines executing filters of parallel nodes in this pipeline.
func (*Pipeline[T]) SetVariableBatchSize ¶
SetVariableBatchSize sets the batch size(s) for the batches that are created from the data source for this pipeline, if the expected total size for this pipeline's data source is unknown or difficult to determine.
SetVariableBatchSize can be called safely by user programs before Run or RunWithContext is called.
If user programs do not call SetVariableBatchSize, or pass a value < 1 to any of the two parameters, then the pipeline will choose a reasonable default value for that respective parameter.
The pipeline will start with batchInc as a batch size, and increase the batch size for every subsequent batch by batchInc to accomodate data sources of different total sizes. The batch size will never be larger than maxBatchSize, though.
If the expected total size for this pipeline's data source is known, or can be determined easily, use NofBatches to influence the batch size.
type Predicate ¶
A Predicate is a function that is passed a data batch and returns a boolean value.
In most cases, it will cast the data parameter to a specific slice type and check a predicate on each element of the slice.
type Receiver ¶
A Receiver is called for every data batch, and returns a potentially modified data batch. The seqNo parameter indicates the order in which the data batch was encountered at the current pipeline's data source.
type Scanner ¶
Scanner is a wrapper around bufio.Scanner so it can act as a data source for pipelines. It fetches strings.
func NewScanner ¶
NewScanner returns a new Scanner to read from r. The split function defaults to bufio.ScanLines.
type Source ¶
type Source[T any] interface { // Err returns an error value or nil Err() error // Prepare receives a pipeline context and informs the pipeline what the // total expected size of all data batches is. The return value is -1 if the // total size is unknown or difficult to determine. Prepare(ctx context.Context) (size int) // Fetch gets a data batch of the requested size from the source. It returns // the size of the data batch that it was actually able to fetch. It returns // 0 if there is no more data to be fetched from the source; the pipeline // will then make no further attempts to fetch more elements. Fetch(size int) (fetched int) // Data returns the last fetched data batch. Data() T }
A Source represents an object that can generate data batches for pipelines.