jpipe

package module
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2023 License: MIT Imports: 8 Imported by: 0

README

JPipe

go report card go version documentation Go.Dev reference MIT license

A user-friendly implementation of the pipeline pattern in Go.

Overview

The pipeline pattern has been described by members of the core Go team several times:

Go provides very powerful concurrency primitives, but implementing the pipeline pattern correctly, with a correct handling of cancellation, requires a very good understanding of those primitives, and some non-negligible amount of boilerplate code. As pipelines become more complex, that boilerplate also starts to weigh heavily on code readability. Enter JPipe.

Features
  • Simple, controlled, per-operator concurrency
  • Ordered FIFO-like concurrency
  • Asynchronous execution and API
  • Safe context-based cancellation
  • Type-safe API
  • Fluent API (as much as allowed by Go generics)
Model

A Pipeline is a directed acyclic graph(DAG), where operators are nodes and Channels are edges:

graph LR;
    A["FromSlice\n(operator)"]--Channel-->B["Filter\n(operator)"];
    B--Channel-->C["Map\n(operator)"];
    C--Channel-->D["ForEach\n(operator)"];

JPipe has the classic operators Map, Filter, ForEach and many more. Operators may have options. In particular, operators that take a function as parameter(e.g. Map) usually support concurrency, which applies only to that operator, and not the whole pipeline. This allows for fine-grained concurrency control.

Operators are not necessarily linear, so they may have multiple input/output Channels. Merge e.g. takes several inputs and merges them into a single output.

Channels are just a light wrapper over a plain Go channel, and you can reason about them in the same way you do with Go channels. The only exception is that a Channel can only be input to one operator.

Usage

Assume we have an expensive IO operation that takes 1 second to execute:

func expensiveIOOperation(id int) {
    time.Sleep(time.Second)
}

Imagine this operation must be run for ids 1 through 10. We don't want to wait 10 seconds though, so we decide to do it with a concurrency factor of 5, expecting to get the full operation down to 2 seconds. The full Go code for that would be:

Plain Go version
ids := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
channel := make(chan int)
concurrency := 5
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
  wg.Add(1)
  go func() {
    defer wg.Done()
    for id := range channel {
      expensiveIOOperation(id)
    }
  }()
}

outer:
for _, id := range ids {
  select {
  // The nested select gives priority to the ctx.Done() signal, so we always exit early if needed
  // Without it, a single select just has no priority, so a new value could be processed even if the context has been canceled
  case <-ctx.Done():
    break outer
  default:
    select {
    case channel <- id:
    case <-ctx.Done(): // always check ctx.Done() to avoid leaking the goroutine
      break outer
    }
  }
}
close(channel)

wg.Wait()

That's a lot of code right there for a simple work pool! We even had to make it collapsable to avoid disrupting the reading flow. Admittedly, most of the complexity comes from cancellation handling, but you don't want to go around leaking your goroutines. Now let's see how the same thing is done with JPipe:

ids := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
pipeline := jpipe.New(ctx)
<-jpipe.FromSlice(pipeline, ids).
    ForEach(expensiveIOOperation, jpipe.Concurrent(5))
Complex pipelines

The above is a simple work pool and admittedly the most common use case you'll find for concurrency. But JPipe allows you to build more complex pipelines with its catalog of operators. Imagine the following processing pipeline:

graph LR;
    F1["Transaction feed 1"]-->M["Merge"];
    F2["Transaction feed 2"]-->M;
    M-->F["Filter transactions\nover 50 EUR"]
    F-->T["Take first 20\ntransactions"]
    T-->E["Enhance transactions\nwith external data\n(IO-bound,\nconcurrency 5,\nFIFO)"]
    E-->B["Batch transactions\n(batch size 3,\ntimeout 5 sec)"]
    B-->K["Send batches\nto Kafka"]

The JPipe implementation would be:

pipeline := jpipe.New(ctx)
feed1 := jpipe.FromGoChannel(pipeline, getTransactionsFromFeed("feed1"))
feed2 := jpipe.FromGoChannel(pipeline, getTransactionsFromFeed("feed2"))

txs := jpipe.Merge(feed1, feed2).
    Filter(func(ft FeedTransaction) bool { return ft.Amount > 50 }).
    Take(20)

enhancedTxs := jpipe.Map(txs, enhanceTransaction, jpipe.Concurrent(4), jpipe.Ordered(10))
<-jpipe.Batch(enhancedTxs, 3, 5*time.Second).
    ForEach(sendTransactionBatchToKafka)

Documentation

You can find much more details on our official documentation. Some useful links in there:

You can also check the Go.Dev reference.

Similar projects

  • RxGo: Probably the best alternative out there, with lots of operators. It hasn't seen action in two years though, so it hasn't adopted generics: the API still deals with interface{}. A potential drawback for Go developers is its use of the reactive model, which is an abstraction very different from Go channels, and requires understanding of concepts like observer, observable, backpressure strategies, etc. If you'd rather stay within Go's channel abstraction, JPipe may be simpler to use, but RxGo may be your preferred choice if you have a background with the reactive model from other languages.
  • pipeline: An implementation of the pipeline pattern, but with a limited set of operators. It hasn't adopted generics yet either.
  • parapipe: A very simple pipeline implementation, but it has a single Pipe operator and does not support very complex pipelines. It hasn't adopted generics yet either.
  • ordered-concurrently: An implementation of ordered concurrency. It doesn't try to be a complete pipeline pattern library though, and just focuses on that feature.

License

JPipe is open-source software released under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Buffered added in v0.1.22

func Buffered(size int) options.Buffered

func Concurrent added in v0.1.22

func Concurrent(concurrency int) options.Concurrent

func ErrorItem added in v0.1.23

func ErrorItem[T any](err error) item.Item[T]

func Item

func Item[T any](value T, err error, ctx context.Context) item.Item[T]

func KeepFirst added in v0.1.22

func KeepFirst() options.Keep

func KeepLast added in v0.1.22

func KeepLast() options.Keep

func NewOrderingBuffer added in v0.2.0

func NewOrderingBuffer[T any, R any](node workerNode[T, R], concurrency int, bufferSize int) *orderingBuffer[T, R]

func Ordered added in v0.2.0

func Ordered(orderBufferSize int) options.Ordered

func Reduce

func Reduce[T any, R any](input *Channel[T], reducer func(R, T) R) <-chan R

Reduce performs a stateful reduction of the input values. The reducer receives the current state and the current value, and must return the new state. The final state is sent to the returned channel when all input values have been processed, or the pipeline is canceled.

Example. Calculating the sum of all input values:

output := Reduce(input, func(acc int64, value int) int64 { return acc + int64(value) })

input : 0--1--2--3--X
output: ------------6

func ToMap

func ToMap[T any, K comparable](input *Channel[T], getKey func(T) K, opts ...options.ToMapOption) <-chan map[K]T

ToMap puts all values coming from the input channel in a map, using the getKey parameter to calculate the key. The resulting map is sent to the returned channel when all input values have been processed, or the pipeline is canceled.

Example:

output := ToMap(input, func(value string) string { return strings.Split(value, "_")[0] })

input : A_0--B_1--C_2--X
output: ---------------{A:A_0, B:B_1, C:C_2}

func ValueItem added in v0.1.23

func ValueItem[T any](value T) item.Item[T]

Types

type Channel

type Channel[T any] struct {
	// contains filtered or unexported fields
}

A Channel is a wrapper for a Go channel. It provides chainable methods to construct pipelines, but conceptually it must be seen as a nothing but an enhanced Go channel.

func Batch

func Batch[T any](input *Channel[T], size int, timeout time.Duration) *Channel[[]T]

Batch batches input values in slices and sends those slices to the output channel Batches can be limited by size and by time. Size/time are ignored if they are 0

Example:

output := Batch(input, 3, 0)

input : 0--1----2----------3------4--5----------6--7----X
output: --------{1-2-3}--------------{3-4-5}-------{6-7}X

func Concat

func Concat[T any](inputs ...*Channel[T]) *Channel[T]

Concat concatenates multiple input channels to a single output channel. Channels are consumed in order, e.g., the second channel won't be consumed until the first channel is closed.

Example:

output := Concat(input1, input2)

input 1: 0----1----2------3-X
input 2: -----5------6--------------7--X
output : 0----1----2------3-5-6-----7--X

func Distinct

func Distinct[T any, K comparable](input *Channel[T], getKey func(T) K) *Channel[T]

Distinct sends only input values for which the key hasn't been seen before to the output channel. It uses an internal map to keep track of all keys seen, so keep in mind that it could exhaust memory if too many distinct values are received.

Example:

output := Distinct(input, func(value int) int { return value })

input : 0--1--2--1--3--2-X
output: 0--1--2-----3----X

func FlatMap

func FlatMap[T any, R any](input *Channel[T], mapper func(T) *Channel[R], opts ...options.FlatMapOption) *Channel[R]

FlatMap transforms every input value into a Channel and for each of those, it sends all values to the output channel.

Example:

output := FlatMap(input, func(i int) *Channel[int] { return FromSlice([]int{i, i + 10}) })

input : 0------1------2------3------4------5------X
output: 0-10---1-11---2-12---3-13---4-14---5-15---X

func FromGenerator

func FromGenerator[T any](pipeline *Pipeline, generator func(i uint64) T) *Channel[T]

FromGenerator creates a Channel from a stateless generator function. Values returned by the function are sent to the channel in order.

func FromGoChannel

func FromGoChannel[T any](pipeline *Pipeline, channel <-chan T) *Channel[T]

FromGoChannel creates a Channel from a Go channel

func FromRange

func FromRange[T constraints.Integer](pipeline *Pipeline, start T, end T) *Channel[T]

FromRange creates a Channel from a range of integers. All integers between start and end (both inclusive) are sent to the channel in order

func FromSlice

func FromSlice[T any](pipeline *Pipeline, slice []T) *Channel[T]

FromSlice creates a Channel from a slice. All values in the slice are sent to the channel in order

func Map

func Map[T any, R any](input *Channel[T], mapper func(T) R, opts ...options.MapOption) *Channel[R]

Map transforms every input value with a mapper function and sends the results to the output channel.

Example:

output := Map(input, func(i int) int { return i + 10 })

input : 0--1--2--3--4--5--X
output: 10-11-12-13-14-15-X

func Merge

func Merge[T any](inputs ...*Channel[T]) *Channel[T]

Merge merges multiple input channels to a single output channel. Values from input channels are sent to the output channel as they arrive, with no specific priority.

Example:

output := Merge(input1, input2)

input1: 0----1----2------3-X
input2: -----5------6------X
output: 0----5-1--2-6----3-X

func Wrap

func Wrap[T any](input *Channel[T]) *Channel[item.Item[T]]

Wrap wraps every input value T in an Item[T] and sends it to the output channel. Item[T] is used mostly to represent items that can have either a value or an error. Another use for Item[T] is using the Context in it and enrich it in successive operators.

func (*Channel[T]) All

func (input *Channel[T]) All(predicate func(T) bool) <-chan bool

All determines if all input values match the predicate. If all values match the predicate, true is sent to the returned channel when all input values have been processed, or the pipeline is canceled. If instead some value does not match the predicate, false is immediately sent to the returned channel and no more input values are read.

Example 1:

output := input.All(func(value int) bool { return value < 4 })

input : 0--1--2--3--X
output: ------------true

Example 2:

output := input.All(func(value int) bool { return value < 2 })

input : 0--1--2--3--X
output: ------false

func (*Channel[T]) Any

func (input *Channel[T]) Any(predicate func(T) bool) <-chan bool

Any determines if any input value matches the predicate. If no value matches the predicate, false is sent to the returned channel when all input values have been processed, or the pipeline is canceled. If instead some value is found to match the predicate, true is immediately sent to the returned channel and no more input values are read.

Example 1:

output := input.Any(func(value int) bool { return value > 3 })

input : 0--1--2--3--X
output: ------------false

Example 2:

output := input.Any(func(value int) bool { return value >= 2 })

input : 0--1--2--3--X
output: ------true

func (*Channel[T]) Broadcast

func (input *Channel[T]) Broadcast(numOutputs int, opts ...options.BroadcastOption) []*Channel[T]

Broadcast sends each input value to every output channel. The next input value is not read by this operator until all output channels have read the current one. Bear in mind that if one of the output channels is a slow consumer, it may block the other consumers. This is a particularly annoying type of backpressure, cause not only does it block the input, it also blocks other consumers. To avoid this, consider using options.Buffered and the output channels will be buffered, with no need for an extra Buffer operator.

Example:

outputs := input.Broadcast(2, Buffered(4))

input  : 0--1--2--3--4--5---X
output1: 0--1--2--3--4--5---X
output2: 0--1--2--3--4--5---X

func (*Channel[T]) Buffer

func (input *Channel[T]) Buffer(n int) *Channel[T]

Buffer transparently passes input values to the output channel, but the output channel is buffered. It is useful to avoid backpressure from slow consumers.

func (*Channel[T]) Count

func (input *Channel[T]) Count() <-chan int64

Count counts input values and sends the final count to the output channel. The final count is sent to the return channel when all input values have been processed, or the pipeline is canceled.

Example:

output := input.ToGoChannel()

input : 9--8--7--6--X
output: ------------4

func (*Channel[T]) Filter

func (input *Channel[T]) Filter(predicate func(T) bool, opts ...options.FilterOption) *Channel[T]

Filter sends to the output channel only the input values that match the predicate.

Example:

output := input.Filter(func(i int) bool { return i%2==1 })

input : 0--1--2--3--4--5-X
output: ---1-----3-----5-X

func (*Channel[T]) ForEach

func (input *Channel[T]) ForEach(function func(T), opts ...options.ForEachOption) <-chan struct{}

ForEach calls the function passed as parameter for every value coming from the input channel. The returned channel will close when all input values have been processed, or the pipeline is canceled.

func (*Channel[T]) Interval

func (input *Channel[T]) Interval(interval func(value T) time.Duration) *Channel[T]

Interval transparently passes all input values to the output channel, but a time interval is awaited after each element before sending another one. No value is sent to the output while that interval is active. This operator is prone to generating backpressure, so use it with care, and consider adding a Buffer before it.

Example(assume each hyphen is 1 ms):

output := input.Interval(4*time.Millisecond)

input : 0--1--2--------------3--4--5--X
output: 0----1----2----------3----4----5-X

func (*Channel[T]) Last

func (input *Channel[T]) Last() <-chan T

Last sends the last value received from the input channel to the output channel. The last value is sent to the returned channel when all input values have been processed, or the pipeline is canceled.

Example:

output := input.Last()

input : 0--1--2--3------X
output: ----------------3

func (*Channel[T]) None

func (input *Channel[T]) None(predicate func(T) bool) <-chan bool

None determines if no input value matches the predicate. If no value matches the predicate, true is sent to the returned channel when all input values have been processed, or the pipeline is canceled. If instead some value matches the predicate, false is immediately sent to the returned channel and no more input values are read.

Example 1:

output := input.None(func(value int) bool { return value > 3 })

input : 0--1--2--3--X
output: ------------true

Example 2:

output := input.None(func(value int) bool { return value >= 2 })

input : 0--1--2--3--X
output: ------false

func (*Channel[T]) Pipeline added in v0.2.3

func (c *Channel[T]) Pipeline() *Pipeline

func (*Channel[T]) Skip

func (input *Channel[T]) Skip(n uint64) *Channel[T]

Skip skips the first n input values, and then starts sending values from n+1 on to the output channel

Example:

output := input.Skip(2)

input : 0--1--2--3--4--5-X
output: ------2--3-----5-X

func (*Channel[T]) Split added in v0.2.0

func (input *Channel[T]) Split(numOutputs int, opts ...options.SplitOption) []*Channel[T]

Split sends each input value to any of the output channels, with no specific priority.

Example:

outputs := input.Split(2, Buffered(4))

input  : 0--1--2--3--4--5---X
output1: 0-----2--3-----5---X
output2: ---1--------4------X

func (*Channel[T]) Take

func (input *Channel[T]) Take(n uint64) *Channel[T]

Take sends the first n input values to the output channel, and then stops processing and closes the output channel.

Example:

output := input.Take(3)

input : 0--1--2--3--4--5-X
output: 0--1--2-X

func (*Channel[T]) Tap

func (input *Channel[T]) Tap(function func(T), opts ...options.TapOption) *Channel[T]

Tap runs a function as a side effect for each input value, and then sends the input values transparently to the output channel. A common use case is logging.

func (*Channel[T]) ToGoChannel

func (input *Channel[T]) ToGoChannel() <-chan T

ToGoChannel sends all values from the input channel to the returned Go channel. The returned Go channel closes when all input values have been processed, or the pipeline is canceled.

Example:

output := input.ToGoChannel()

input : 0--1--2--3--X
output: 0--1--2--3--X

func (*Channel[T]) ToSlice

func (input *Channel[T]) ToSlice() <-chan []T

ToSlice puts all values coming from the input channel in a slice. The resulting slice is sent to the returned channel when all input values have been processed, or the pipeline is canceled. The slice may have partial results if the pipeline failed, so you must remember to check the pipeline's Error() method.

Example:

output := input.ToSlice()

input : 0--1--2--3--X
output: ------------{0,1,2,3}

type Config

type Config struct {
	// Context is used by a Pipeline for cancellation.
	// If the context gets cancelled, the pipeline gets canceled too.
	Context context.Context
	// StartManually determines whether [Pipeline.Start] must be called manually.
	// If false, the first sink operator(ForEach, ToSlice, etc) to be created in the pipeline automatically starts it.
	// If true, the pipeline will be dormant until [Pipeline.Start] is called.
	StartManually bool
}

A Config can be used to create a pipeline with certain settings

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

A Pipeline is a container for the classic Pipelines and Cancellation pattern. Pipelines are safe to use from multiple goroutines.

Pipeline works as a coordinator or context for multiple operators and Channels. It can also be seen as a sort of context and passed around as such.

func New

func New(ctx context.Context) *Pipeline

New returns a pipeline with the given backing context. StartManually is false by default, meaning the pipeline will start when a sink operator(ForEach, ToSlice, etc) is created for it.

func NewPipeline

func NewPipeline(config Config) *Pipeline

NewPipeline returns a Pipeline with the given jpipe.Config

func (*Pipeline) Cancel

func (p *Pipeline) Cancel(err error)

Cancel manually cancels the pipeline with the given error

func (*Pipeline) Context added in v0.2.3

func (p *Pipeline) Context() context.Context

func (*Pipeline) Done

func (p *Pipeline) Done() <-chan struct{}

Done returns a channel that's close when the pipeline either completed successfully or failed.

func (*Pipeline) Error

func (p *Pipeline) Error() error

Error returns the error in the pipeline if any. It returns nil if the pipeline is still running, or it completed successfully.

func (*Pipeline) IsDone

func (p *Pipeline) IsDone() bool

IsDone synchronously returns whether the pipeline completed successfully or failed.

func (*Pipeline) Start

func (p *Pipeline) Start()

Start manually starts the pipeline. If the pipeline is already started, Start has no effect.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL