Documentation ¶
Overview ¶
Package gostreams provides a set of operations on streams of elements. Streams form a pipeline of operations that elements are passing through.
Streams consist of an initial ProducerFunc, which can produce elements from slices, channels, or any arbitrary source.
Intermediate ProducerFuncs may apply mapping, filtering, and sorting operations to the elements. Some of these operations can work on elements concurrently to increase throughput.
Finally, ConsumerFuncs consume the elements, and perform a final operation on them, such as collecting them into slices or maps, grouping/partitioning them, checking for matching elements, or simply iterating over them.
Stream operations receive a context.CancelCauseFunc. Calling the cancel function cancels the entire stream, short-circuiting the processing of elements. Depending on the intermediate operations and the final consumer, the result of the consumer may be undefined. Producer implementations must be prepared to handle cancellation at any time by checking the provided context.Context.
Streams are always lazy, meaning that producers will produce a new element only after a downstream producer or consumer has consumed the previous element.
Example ¶
// construct a producer from a slice ints := Produce([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) // filter for even elements // since we only need the elements themselves, we can use FuncPredicate ints = Filter(ints, FuncPredicate(func(elem int) bool { return elem%2 == 0 })) // map elements by doubling them // since we only need the elements themselves, we can use FuncMapper ints = Map(ints, FuncMapper(func(elem int) int { return elem * 2 })) // map elements by converting them to strings intStrs := Map(ints, FuncMapper(strconv.Itoa)) // perform a reduction to collect the strings into a slice strs, _ := ReduceSlice(context.Background(), intStrs) fmt.Printf("%+v\n", strs)
Output: [4 8 12 16 20]
Index ¶
- Variables
- func AllMatch[T any](ctx context.Context, prod ProducerFunc[T], pred PredicateFunc[T]) (bool, error)
- func AnyMatch[T any](ctx context.Context, prod ProducerFunc[T], pred PredicateFunc[T]) (bool, error)
- func Count[T any](ctx context.Context, prod ProducerFunc[T]) (uint64, error)
- func Each[T any](ctx context.Context, prod ProducerFunc[T], each ConsumerFunc[T]) error
- func EachConcurrent[T any](ctx context.Context, prod ProducerFunc[T], each ConsumerFunc[T]) error
- func First[T any](ctx context.Context, prod ProducerFunc[T]) (T, bool, error)
- func Reduce[T any, R any](ctx context.Context, prod ProducerFunc[T], coll CollectorFunc[T, R]) (R, error)
- func ReduceSlice[T any](ctx context.Context, prod ProducerFunc[T]) ([]T, error)
- type CollectorFunc
- func CollectGroup[T any, K comparable, V any](key MapperFunc[T, K], value MapperFunc[T, V]) CollectorFunc[T, map[K][]V]
- func CollectMap[T any, K comparable, V any](key MapperFunc[T, K], value MapperFunc[T, V]) CollectorFunc[T, map[K]V]
- func CollectMapNoDuplicateKeys[T any, K comparable, V any](key MapperFunc[T, K], value MapperFunc[T, V]) CollectorFunc[T, map[K]V]
- func CollectPartition[T any, V any](pred PredicateFunc[T], value MapperFunc[T, V]) CollectorFunc[T, map[bool][]V]
- func CollectSlice[T any]() CollectorFunc[T, []T]
- type CompareFunc
- type ConsumerFunc
- type DuplicateKeyError
- type Function
- type MapperFunc
- type PredicateFunc
- type ProducerFunc
- func Distinct[T comparable](prod ProducerFunc[T]) ProducerFunc[T]
- func DistinctSeen[T any](prod ProducerFunc[T], seen SeenFunc[T]) ProducerFunc[T]
- func Filter[T any](prod ProducerFunc[T], filter PredicateFunc[T]) ProducerFunc[T]
- func FilterConcurrent[T any](prod ProducerFunc[T], filter PredicateFunc[T]) ProducerFunc[T]
- func FlatMap[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, ProducerFunc[U]]) ProducerFunc[U]
- func FlatMapConcurrent[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, ProducerFunc[U]]) ProducerFunc[U]
- func Join[T any](producers ...ProducerFunc[T]) ProducerFunc[T]
- func JoinConcurrent[T any](producers ...ProducerFunc[T]) ProducerFunc[T]
- func Limit[T any](prod ProducerFunc[T], max uint64) ProducerFunc[T]
- func Map[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, U]) ProducerFunc[U]
- func MapConcurrent[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, U]) ProducerFunc[U]
- func Peek[T any](prod ProducerFunc[T], peek ConsumerFunc[T]) ProducerFunc[T]
- func Produce[T any](slices ...[]T) ProducerFunc[T]
- func ProduceChannel[T any](channels ...<-chan T) ProducerFunc[T]
- func ProduceChannelConcurrent[T any](channels ...<-chan T) ProducerFunc[T]
- func Skip[T any](prod ProducerFunc[T], num uint64) ProducerFunc[T]
- func Sort[T any](prod ProducerFunc[T], cmp CompareFunc[T]) ProducerFunc[T]
- type SeenFunc
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrLimitReached = errors.New("limit reached")
ErrLimitReached is the error used to short-circuit a stream by canceling its context to indicate that the maximum number of elements given to Limit has been reached.
var ErrShortCircuit = errors.New("short circuit")
ErrShortCircuit is a generic error used to short-circuit a stream by canceling its context.
Functions ¶
func AllMatch ¶
func AllMatch[T any](ctx context.Context, prod ProducerFunc[T], pred PredicateFunc[T]) (bool, error)
AllMatch returns true if pred returns true for all elements produced by prod, that is, all elements match. If any element doesn't match, it cancels the stream's context using ErrShortCircuit, and returns a nil error. If the stream's context is canceled with any other error, it returns an undefined result, and the cause of the cancellation.
func AnyMatch ¶
func AnyMatch[T any](ctx context.Context, prod ProducerFunc[T], pred PredicateFunc[T]) (bool, error)
AnyMatch returns true as soon as pred returns true for an element produced by prod, that is, an element matches. If an element matches, it cancels the stream's context using ErrShortCircuit, and returns a nil error. If the stream's context is canceled with any other error, it returns an undefined result, and the cause of the cancellation.
func Count ¶
Count returns the number of elements produced by prod. If the stream's context is canceled, it returns an undefined result, and the cause of the cancellation.
func Each ¶
func Each[T any](ctx context.Context, prod ProducerFunc[T], each ConsumerFunc[T]) error
Each calls each for each element produced by prod. If the stream's context is canceled, it returns the cause of the cancellation. If the cause of cancellation was ErrShortCircuit, it returns a nil error instead.
func EachConcurrent ¶
func EachConcurrent[T any](ctx context.Context, prod ProducerFunc[T], each ConsumerFunc[T]) error
EachConcurrent concurrently calls each for each element produced by prod. If the stream's context is canceled, it returns the cause of the cancellation. If the cause of cancellation was ErrShortCircuit, it returns a nil error instead.
func First ¶ added in v0.5.0
First returns the first element produced by prod. If prod produces an element, it cancels the stream's context using ErrShortCircuit, and returns the element, a true bool result, and a nil error. If prod does not produce elements, it returns a false bool result, and a nil error. If the stream's context is canceled, it returns an undefined result, and the cause of the cancellation.
func Reduce ¶
func Reduce[T any, R any](ctx context.Context, prod ProducerFunc[T], coll CollectorFunc[T, R]) (R, error)
Reduce calls coll for each element produced by prod, and returns the final result. If the stream's context is canceled, it returns the result so far, and the cause of the cancellation. If the cause of cancellation was ErrShortCircuit, it returns a nil error instead.
func ReduceSlice ¶ added in v0.2.0
func ReduceSlice[T any](ctx context.Context, prod ProducerFunc[T]) ([]T, error)
ReduceSlice returns a slice of all elements produced by prod. If the stream's context is canceled, it returns the collected elements so far, and the cause of the cancellation. If the cause of cancellation was ErrShortCircuit, it returns a nil error instead.
Types ¶
type CollectorFunc ¶ added in v0.4.0
type CollectorFunc[T any, R any] func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) R
CollectorFunc folds element elem into an internal accumulator and returns the result so far. The index is the 0-based index of elem, in the order produced by the upstream producer.
func CollectGroup ¶
func CollectGroup[T any, K comparable, V any](key MapperFunc[T, K], value MapperFunc[T, V]) CollectorFunc[T, map[K][]V]
CollectGroup returns a collector that collects elements into a group map, according to key. It uses value to map elements to values.
func CollectMap ¶
func CollectMap[T any, K comparable, V any](key MapperFunc[T, K], value MapperFunc[T, V]) CollectorFunc[T, map[K]V]
CollectMap returns a collector that collects elements into a map, using key to map elements to keys, and value to map elements to values. If a key is already in the map, it overwrites the map entry.
func CollectMapNoDuplicateKeys ¶
func CollectMapNoDuplicateKeys[T any, K comparable, V any](key MapperFunc[T, K], value MapperFunc[T, V]) CollectorFunc[T, map[K]V]
CollectMapNoDuplicateKeys returns a collector that collects elements into a map, using key to map elements to keys, and value to map elements to values. If a key is already in the map, it cancels the stream's context with a DuplicateKeyError.
func CollectPartition ¶
func CollectPartition[T any, V any](pred PredicateFunc[T], value MapperFunc[T, V]) CollectorFunc[T, map[bool][]V]
CollectPartition returns a collector that collects elements into a partition map, according to pred. It uses value to map elements to values.
func CollectSlice ¶
func CollectSlice[T any]() CollectorFunc[T, []T]
CollectSlice returns a collector that collects elements into a slice.
type CompareFunc ¶ added in v0.6.0
CompareFunc returns a negative number if a < b, a positive number if a > b, and zero if a == b.
type ConsumerFunc ¶
type ConsumerFunc[T any] func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64)
ConsumerFunc consumes element elem. The index is the 0-based index of elem, in the order produced by the upstream producer.
type DuplicateKeyError ¶
type DuplicateKeyError[T any, K comparable] struct { // Element is the upstream producer's element that caused the error. Element T // Key is the key that was already in the map. Key K }
DuplicateKeyError is the error used to short-circuit a stream by canceling its context to indicate that a key couldn't be added to a map because it already exists.
func (*DuplicateKeyError[T, K]) Error ¶
func (e *DuplicateKeyError[T, K]) Error() string
Error implements error.
type MapperFunc ¶
type MapperFunc[T any, U any] func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) U
MapperFunc maps element elem to type U. The index is the 0-based index of elem, in the order produced by the upstream producer.
func FuncMapper ¶
func FuncMapper[T any, U any](mapp Function[T, U]) MapperFunc[T, U]
FuncMapper returns a mapper that calls mapp for each element.
func Identity ¶
func Identity[T any]() MapperFunc[T, T]
Identity returns a mapper that returns the same element it receives.
type PredicateFunc ¶
type PredicateFunc[T any] func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) bool
PredicateFunc returns true if elem matches a predicate. The index is the 0-based index of elem, in the order produced by the upstream producer.
func FuncPredicate ¶ added in v0.4.0
func FuncPredicate[T any](pred Function[T, bool]) PredicateFunc[T]
FuncPredicate returns a predicate that calls pred for each element.
type ProducerFunc ¶
type ProducerFunc[T any] func(ctx context.Context, cancel context.CancelCauseFunc) <-chan T
ProducerFunc returns a channel of elements for a stream.
func Distinct ¶ added in v0.2.0
func Distinct[T comparable](prod ProducerFunc[T]) ProducerFunc[T]
Distinct returns a producer that produces those elements produced by prod which are distinct.
func DistinctSeen ¶ added in v0.2.0
func DistinctSeen[T any](prod ProducerFunc[T], seen SeenFunc[T]) ProducerFunc[T]
DistinctSeen returns a producer that produces those elements produced by prod for which seen returns false.
func Filter ¶
func Filter[T any](prod ProducerFunc[T], filter PredicateFunc[T]) ProducerFunc[T]
Filter returns a producer that calls filter for each element produced by prod, and only produces elements for which filter returns true.
func FilterConcurrent ¶
func FilterConcurrent[T any](prod ProducerFunc[T], filter PredicateFunc[T]) ProducerFunc[T]
FilterConcurrent returns a producer that calls filter for each element produced by prod, and only produces elements for which filter returns true. It produces elements in undefined order.
func FlatMap ¶
func FlatMap[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, ProducerFunc[U]]) ProducerFunc[U]
FlatMap returns a producer that calls mapp for each element produced by prod, mapping it to an intermediate producer that produces elements of type U. The new producer produces all elements produced by the intermediate producers, in order.
func FlatMapConcurrent ¶
func FlatMapConcurrent[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, ProducerFunc[U]]) ProducerFunc[U]
FlatMapConcurrent returns a producer that calls mapp for each element produced by prod, mapping it to an intermediate producer that produces elements of type U. The new producer produces all elements produced by the intermediate producers, in undefined order.
func Join ¶
func Join[T any](producers ...ProducerFunc[T]) ProducerFunc[T]
Join returns a producer that produces the elements produced by the given producers, in order.
func JoinConcurrent ¶
func JoinConcurrent[T any](producers ...ProducerFunc[T]) ProducerFunc[T]
JoinConcurrent returns a producer that produces the elements produced by the given producers, in undefined order. It consumes the producers concurrently.
func Limit ¶
func Limit[T any](prod ProducerFunc[T], max uint64) ProducerFunc[T]
Limit returns a producer that produces the same elements as prod, in order, up to max elements. Once the limit has been reached, it cancels prod's context with ErrLimitReached (but not the entire stream's context).
func Map ¶
func Map[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, U]) ProducerFunc[U]
Map returns a producer that calls mapp for each element produced by prod, mapping it to type U.
func MapConcurrent ¶
func MapConcurrent[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, U]) ProducerFunc[U]
MapConcurrent returns a producer that concurrently calls mapp for each element produced by prod, mapping it to type U. It produces elements in undefined order.
func Peek ¶
func Peek[T any](prod ProducerFunc[T], peek ConsumerFunc[T]) ProducerFunc[T]
Peek returns a producer that calls peek for each element produced by prod, in order, and produces the same elements.
func Produce ¶
func Produce[T any](slices ...[]T) ProducerFunc[T]
Produce returns a producer that produces the elements of the given slices, in order.
func ProduceChannel ¶
func ProduceChannel[T any](channels ...<-chan T) ProducerFunc[T]
ProduceChannel returns a producer that produces the elements received through the given channels, in order.
func ProduceChannelConcurrent ¶
func ProduceChannelConcurrent[T any](channels ...<-chan T) ProducerFunc[T]
ProduceChannelConcurrent returns a producer that produces the elements received through the given channels, in undefined order. It consumes the channels concurrently.
func Skip ¶
func Skip[T any](prod ProducerFunc[T], num uint64) ProducerFunc[T]
Skip returns a producer that produces the same elements as prod, in order, skipping the first num elements.
func Sort ¶
func Sort[T any](prod ProducerFunc[T], cmp CompareFunc[T]) ProducerFunc[T]
Sort returns a producer that consumes elements from prod, sorts them using sort, and produces them in sorted order.