stream

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2022 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package Stream defines a set of operations that provide lazy operations on a Stream of data values. This package is still experimental, please use at your own risk.

A Stream is an abstraction that produces a single value at a time. A Stream can produce infinite values, or just a finite set of values. A simple example of using a Stream is:

ForEach(Take(Iota[int](), 10), func(value int) {
	// Do something with 'value'
})

Iota() produces a Stream, which is consumed by Take(), which produces a Stream that is consumed by ForEach().

Streams are synchronous by default. A segment of a Stream can be made asynchronous by using the Async() adapter. All operations inside this adapter will execute asynchronously from the main thread, and their results will be fed through a channel back to the main thread.

The design of streams is heavily influenced by C#'s LINQ and Java's Stream API. The current design is not considered stable yet, so newer versions may cause the API to change significantly.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ChunkOut added in v0.0.4

func ChunkOut[T any](input Stream[T], size int) [][]T

ChunkOut drains a Stream and provides its output as an array of chunks.

func ForEach

func ForEach[T any](s Stream[T], op common.UnaryOp[T])

ForEach processes each item in the Stream. It will not stop until the input Stream terminates.

func OneOrNone added in v0.0.7

func OneOrNone[T any](input Stream[T]) *cnt.Optional[T]

OneOrNone reads one item from the stream and returns it or an empty optional.

func ToSlice

func ToSlice[T any](input Stream[T]) []T

ToSlice drains a Stream and returns the result as a slice.

Types

type Chain added in v0.0.7

type Chain[T any] struct {
	Stream Stream[T]
}

func Generator added in v0.0.7

func Generator[T any](g func() *cnt.Optional[T]) Chain[T]

func Monotonic added in v0.0.7

func Monotonic[T constraints.Integer]() Chain[T]

func MonotonicWithStart added in v0.0.7

func MonotonicWithStart[T constraints.Integer](start T) Chain[T]

func Pipe added in v0.0.7

func Pipe[T any](s Stream[T]) Chain[T]

func Slice added in v0.0.7

func Slice[T any](data []T) Chain[T]

func (Chain[T]) Filter added in v0.0.7

func (s Chain[T]) Filter(pred common.Predicate[T]) Chain[T]

func (Chain[T]) ForEach added in v0.0.7

func (s Chain[T]) ForEach(pred common.UnaryOp[T])

func (Chain[T]) MergeSort added in v0.0.7

func (s Chain[T]) MergeSort(o Chain[T], pred common.BinaryPredicate[T]) Chain[T]

func (Chain[T]) OneOrNone added in v0.0.7

func (s Chain[T]) OneOrNone() *cnt.Optional[T]

func (Chain[T]) Skip added in v0.0.7

func (s Chain[T]) Skip(limit int) Chain[T]

func (Chain[T]) Take added in v0.0.7

func (s Chain[T]) Take(limit int) Chain[T]

func (Chain[T]) ToSlice added in v0.0.7

func (s Chain[T]) ToSlice() []T

type Stream

type Stream[T any] interface {
	// Next retrieves the next item from the Stream. It returns an Optional[T].
	// If the result is empty it means that the Stream has finished.
	Next() *cnt.Optional[T]
}

Stream is an interface that provides a uni-directional, pull-based data processing facility.

func Async added in v0.0.4

func Async[T any](input Stream[T]) Stream[T]

Async creates a new asynchronous Stream. The input Stream will be read from a go routine, and its results transferred over a channel. That means whatever work the input Stream needs to do can be detached from the work happening in this thread.

func ChunkIn added in v0.0.4

func ChunkIn[T any](input []T, size int) []Stream[T]

ChunkIn provides an array of streams, each one draining a chunk of the input slice.

func Filter

func Filter[T any](input Stream[T], pred common.Predicate[T]) Stream[T]

Filter creates a Stream step the only forwards values that match the predicate.

func FromSlice added in v0.0.4

func FromSlice[T any](data []T) Stream[T]

FromSlice creates a Stream that provides all the elements in the slice, one at a time. When it reaches the end is terminates the Stream.

func Generate added in v0.0.4

func Generate[T any](g func() *cnt.Optional[T]) Stream[T]

Generate creates a Stream that generates user-defined values. When the generator wants to terminate the Stream, it should return true as it's second resut argument.

func Iota

func Iota[T constraints.Integer]() Stream[T]

Iota creates a Stream that provides an infinite number of monotonically increasing integers.

func IotaWithStart

func IotaWithStart[T constraints.Integer](start T) Stream[T]

IotaWithStart creates a Stream just like Iota, but allows you to set the starting point.

func MergeSort added in v0.0.4

func MergeSort[T any](input1, input2 Stream[T], pred common.BinaryPredicate[T]) Stream[T]

MergeSort creates a Stream that sorts the two streams. The pred function should return true if value1 should come before value2. This Stream step requires that both input streams are already sorted with respect to themselves.

func Skip

func Skip[T any](input Stream[T], limit int) Stream[T]

Skip skips a certain number of items from the input Stream and delivers the remainder.

func Take

func Take[T any](input Stream[T], limit int) Stream[T]

Take takes a limited number of items from the Stream and then stops.

func Transform

func Transform[T1, T2 any](input Stream[T1], xform common.UnaryTransform[T1, T2]) Stream[T2]

Transform adds a Stream step that transforms a value into another value, optionally of another type.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL