gostreams

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 3, 2023 License: MIT Imports: 4 Imported by: 0

README

GoDoc

gostreams

A Go package that provides a set of operations on streams of elements.

import "github.com/blizzy78/gostreams"

Code example

// construct a producer from a slice
ints := Produce([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})

// filter for even elements
// since we only need the elements themselves, we can use FuncPredicate
ints = Filter(ints, FuncPredicate(func(elem int) bool {
	return elem%2 == 0
}))

// map elements by doubling them
// since we only need the elements themselves, we can use FuncMapper
ints = Map(ints, FuncMapper(func(elem int) int {
	return elem * 2
}))

// map elements by converting them to strings
intStrs := Map(ints, FuncMapper(strconv.Itoa))

// perform a reduction to collect the strings into a slice
strs, _ := ReduceSlice(context.Background(), intStrs)

fmt.Printf("%+v\n", strs)
// Output: [4 8 12 16 20]

License

This package is licensed under the MIT license.

Documentation

Overview

Package gostreams provides a set of operations on streams of elements. Streams form a pipeline of operations that elements are passing through.

Streams consist of an initial ProducerFunc, which can produce elements from slices, channels, or any arbitrary source.

Intermediate ProducerFuncs may apply mapping, filtering, and sorting operations to the elements. Some of these operations can work on elements concurrently to increase throughput.

Finally, ConsumerFuncs consume the elements, and perform a final operation on them, such as collecting them into slices or maps, grouping/partitioning them, checking for matching elements, or simply iterating over them.

Stream operations receive a context.CancelCauseFunc. Calling the cancel function cancels the entire stream, short-circuiting the processing of elements. Depending on the intermediate operations and the final consumer, the result of the consumer may be undefined. Producer implementations must be prepared to handle cancellation at any time by checking the provided context.Context.

Streams are always lazy, meaning that producers will produce a new element only after a downstream producer or consumer has consumed the previous element.

Example
// construct a producer from a slice
ints := Produce([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})

// filter for even elements
// since we only need the elements themselves, we can use FuncPredicate
ints = Filter(ints, FuncPredicate(func(elem int) bool {
	return elem%2 == 0
}))

// map elements by doubling them
// since we only need the elements themselves, we can use FuncMapper
ints = Map(ints, FuncMapper(func(elem int) int {
	return elem * 2
}))

// map elements by converting them to strings
intStrs := Map(ints, FuncMapper(strconv.Itoa))

// perform a reduction to collect the strings into a slice
strs, _ := ReduceSlice(context.Background(), intStrs)

fmt.Printf("%+v\n", strs)
Output:

[4 8 12 16 20]

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrLimitReached = errors.New("limit reached")

ErrLimitReached is the error used to short-circuit a stream by canceling its context to indicate that the maximum number of elements given to Limit has been reached.

View Source
var ErrShortCircuit = errors.New("short circuit")

ErrShortCircuit is a generic error used to short-circuit a stream by canceling its context.

Functions

func AllMatch

func AllMatch[T any](ctx context.Context, prod ProducerFunc[T], pred PredicateFunc[T]) (bool, error)

AllMatch returns true if pred returns true for all elements produced by prod, that is, all elements match. If any element doesn't match, it cancels the stream's context using ErrShortCircuit, and returns a nil error. If the stream's context is canceled with any other error, it returns an undefined result, and the cause of the cancellation.

func AnyMatch

func AnyMatch[T any](ctx context.Context, prod ProducerFunc[T], pred PredicateFunc[T]) (bool, error)

AnyMatch returns true as soon as pred returns true for an element produced by prod, that is, an element matches. If an element matches, it cancels the stream's context using ErrShortCircuit, and returns a nil error. If the stream's context is canceled with any other error, it returns an undefined result, and the cause of the cancellation.

func Count

func Count[T any](ctx context.Context, prod ProducerFunc[T]) (uint64, error)

Count returns the number of elements produced by prod. If the stream's context is canceled, it returns an undefined result, and the cause of the cancellation.

func Each

func Each[T any](ctx context.Context, prod ProducerFunc[T], each ConsumerFunc[T]) error

Each calls each for each element produced by prod. If the stream's context is canceled, it returns the cause of the cancellation. If the cause of cancellation was ErrShortCircuit, it returns a nil error instead.

func EachConcurrent

func EachConcurrent[T any](ctx context.Context, prod ProducerFunc[T], each ConsumerFunc[T]) error

EachConcurrent concurrently calls each for each element produced by prod. If the stream's context is canceled, it returns the cause of the cancellation. If the cause of cancellation was ErrShortCircuit, it returns a nil error instead.

func First added in v0.5.0

func First[T any](ctx context.Context, prod ProducerFunc[T]) (T, bool, error)

First returns the first element produced by prod. If prod produces an element, it cancels the stream's context using ErrShortCircuit, and returns the element, a true bool result, and a nil error. If prod does not produce elements, it returns a false bool result, and a nil error. If the stream's context is canceled, it returns an undefined result, and the cause of the cancellation.

func Reduce

func Reduce[T any, R any](ctx context.Context, prod ProducerFunc[T], coll CollectorFunc[T, R]) (R, error)

Reduce calls coll for each element produced by prod, and returns the final result. If the stream's context is canceled, it returns the result so far, and the cause of the cancellation. If the cause of cancellation was ErrShortCircuit, it returns a nil error instead.

func ReduceSlice added in v0.2.0

func ReduceSlice[T any](ctx context.Context, prod ProducerFunc[T]) ([]T, error)

ReduceSlice returns a slice of all elements produced by prod. If the stream's context is canceled, it returns the collected elements so far, and the cause of the cancellation. If the cause of cancellation was ErrShortCircuit, it returns a nil error instead.

Types

type CollectorFunc added in v0.4.0

type CollectorFunc[T any, R any] func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) R

CollectorFunc folds element elem into an internal accumulator and returns the result so far. The index is the 0-based index of elem, in the order produced by the upstream producer.

func CollectGroup

func CollectGroup[T any, K comparable, V any](key MapperFunc[T, K], value MapperFunc[T, V]) CollectorFunc[T, map[K][]V]

CollectGroup returns a collector that collects elements into a group map, according to key. It uses value to map elements to values.

func CollectMap

func CollectMap[T any, K comparable, V any](key MapperFunc[T, K], value MapperFunc[T, V]) CollectorFunc[T, map[K]V]

CollectMap returns a collector that collects elements into a map, using key to map elements to keys, and value to map elements to values. If a key is already in the map, it overwrites the map entry.

func CollectMapNoDuplicateKeys

func CollectMapNoDuplicateKeys[T any, K comparable, V any](key MapperFunc[T, K], value MapperFunc[T, V]) CollectorFunc[T, map[K]V]

CollectMapNoDuplicateKeys returns a collector that collects elements into a map, using key to map elements to keys, and value to map elements to values. If a key is already in the map, it cancels the stream's context with a DuplicateKeyError.

func CollectPartition

func CollectPartition[T any, V any](pred PredicateFunc[T], value MapperFunc[T, V]) CollectorFunc[T, map[bool][]V]

CollectPartition returns a collector that collects elements into a partition map, according to pred. It uses value to map elements to values.

func CollectSlice

func CollectSlice[T any]() CollectorFunc[T, []T]

CollectSlice returns a collector that collects elements into a slice.

type CompareFunc added in v0.6.0

type CompareFunc[T any] func(ctx context.Context, cancel context.CancelCauseFunc, a T, b T) int

CompareFunc returns a negative number if a < b, a positive number if a > b, and zero if a == b.

type ConsumerFunc

type ConsumerFunc[T any] func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64)

ConsumerFunc consumes element elem. The index is the 0-based index of elem, in the order produced by the upstream producer.

type DuplicateKeyError

type DuplicateKeyError[T any, K comparable] struct {
	// Element is the upstream producer's element that caused the error.
	Element T

	// Key is the key that was already in the map.
	Key K
}

DuplicateKeyError is the error used to short-circuit a stream by canceling its context to indicate that a key couldn't be added to a map because it already exists.

func (*DuplicateKeyError[T, K]) Error

func (e *DuplicateKeyError[T, K]) Error() string

Error implements error.

type Function

type Function[T any, U any] func(elem T) U

Function returns the result of applying an operation to elem.

type MapperFunc

type MapperFunc[T any, U any] func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) U

MapperFunc maps element elem to type U. The index is the 0-based index of elem, in the order produced by the upstream producer.

func FuncMapper

func FuncMapper[T any, U any](mapp Function[T, U]) MapperFunc[T, U]

FuncMapper returns a mapper that calls mapp for each element.

func Identity

func Identity[T any]() MapperFunc[T, T]

Identity returns a mapper that returns the same element it receives.

type PredicateFunc

type PredicateFunc[T any] func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) bool

PredicateFunc returns true if elem matches a predicate. The index is the 0-based index of elem, in the order produced by the upstream producer.

func FuncPredicate added in v0.4.0

func FuncPredicate[T any](pred Function[T, bool]) PredicateFunc[T]

FuncPredicate returns a predicate that calls pred for each element.

type ProducerFunc

type ProducerFunc[T any] func(ctx context.Context, cancel context.CancelCauseFunc) <-chan T

ProducerFunc returns a channel of elements for a stream.

func Distinct added in v0.2.0

func Distinct[T comparable](prod ProducerFunc[T]) ProducerFunc[T]

Distinct returns a producer that produces those elements produced by prod which are distinct.

func DistinctSeen added in v0.2.0

func DistinctSeen[T any](prod ProducerFunc[T], seen SeenFunc[T]) ProducerFunc[T]

DistinctSeen returns a producer that produces those elements produced by prod for which seen returns false.

func Filter

func Filter[T any](prod ProducerFunc[T], filter PredicateFunc[T]) ProducerFunc[T]

Filter returns a producer that calls filter for each element produced by prod, and only produces elements for which filter returns true.

func FilterConcurrent

func FilterConcurrent[T any](prod ProducerFunc[T], filter PredicateFunc[T]) ProducerFunc[T]

FilterConcurrent returns a producer that calls filter for each element produced by prod, and only produces elements for which filter returns true. It produces elements in undefined order.

func FlatMap

func FlatMap[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, ProducerFunc[U]]) ProducerFunc[U]

FlatMap returns a producer that calls mapp for each element produced by prod, mapping it to an intermediate producer that produces elements of type U. The new producer produces all elements produced by the intermediate producers, in order.

func FlatMapConcurrent

func FlatMapConcurrent[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, ProducerFunc[U]]) ProducerFunc[U]

FlatMapConcurrent returns a producer that calls mapp for each element produced by prod, mapping it to an intermediate producer that produces elements of type U. The new producer produces all elements produced by the intermediate producers, in undefined order.

func Join

func Join[T any](producers ...ProducerFunc[T]) ProducerFunc[T]

Join returns a producer that produces the elements produced by the given producers, in order.

func JoinConcurrent

func JoinConcurrent[T any](producers ...ProducerFunc[T]) ProducerFunc[T]

JoinConcurrent returns a producer that produces the elements produced by the given producers, in undefined order. It consumes the producers concurrently.

func Limit

func Limit[T any](prod ProducerFunc[T], max uint64) ProducerFunc[T]

Limit returns a producer that produces the same elements as prod, in order, up to max elements. Once the limit has been reached, it cancels prod's context with ErrLimitReached (but not the entire stream's context).

func Map

func Map[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, U]) ProducerFunc[U]

Map returns a producer that calls mapp for each element produced by prod, mapping it to type U.

func MapConcurrent

func MapConcurrent[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, U]) ProducerFunc[U]

MapConcurrent returns a producer that concurrently calls mapp for each element produced by prod, mapping it to type U. It produces elements in undefined order.

func Peek

func Peek[T any](prod ProducerFunc[T], peek ConsumerFunc[T]) ProducerFunc[T]

Peek returns a producer that calls peek for each element produced by prod, in order, and produces the same elements.

func Produce

func Produce[T any](slices ...[]T) ProducerFunc[T]

Produce returns a producer that produces the elements of the given slices, in order.

func ProduceChannel

func ProduceChannel[T any](channels ...<-chan T) ProducerFunc[T]

ProduceChannel returns a producer that produces the elements received through the given channels, in order.

func ProduceChannelConcurrent

func ProduceChannelConcurrent[T any](channels ...<-chan T) ProducerFunc[T]

ProduceChannelConcurrent returns a producer that produces the elements received through the given channels, in undefined order. It consumes the channels concurrently.

func Skip

func Skip[T any](prod ProducerFunc[T], num uint64) ProducerFunc[T]

Skip returns a producer that produces the same elements as prod, in order, skipping the first num elements.

func Sort

func Sort[T any](prod ProducerFunc[T], cmp CompareFunc[T]) ProducerFunc[T]

Sort returns a producer that consumes elements from prod, sorts them using sort, and produces them in sorted order.

type SeenFunc added in v0.2.0

type SeenFunc[T any] func(elem T) bool

SeenFunc is a function that returns true if elem has been seen before.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL