Documentation ¶
Overview ¶
Package stream is a library providing lazy generic data streams for Golang.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChanneledInput ¶
type ChanneledInput[T any] interface { ChanneledProducer[T] Writer[T] }
Channeled input is a Producer getting its data from the outside world. The source of the data is a channel which is written to by the programmer. It is closed manually, what causes all attached streams to close also.
Embeds:
- ChanneledProducer
- Writer
Type parameters:
- T - type of the produced values.
func NewChanneledInput ¶
func NewChanneledInput[T any](capacity int) ChanneledInput[T]
NewChanneledInput is a constructor of the channeled input.
Type parameters:
- T - type of the produced values.
Parameters:
- capacity - size of the channel buffer.
Returns:
- pointer to the new channeled input.
type ChanneledProducer ¶
type ChanneledProducer[T any] interface { Producer[T] /* Channel acquires the underlying chan T. Returns: - underlying channel. */ Channel() chan T }
ChanneledProducer is a Producer whose data are drawn from a channel. We may use Get, or work directly with the Channel.
Embeds:
- Producer
Type parameters:
- T - type of the produced values.
type Closable ¶
type Closable interface { /* Closed checks whether the entity is closed or not. Returns: - true if the entity is closed, false otherwise. */ Closed() bool /* Close closes the entity. */ Close() }
Closable is an entity which can be closed.
type Collector ¶
type Collector[T any] interface { /* Collect reads all values from the entity and returns them as a slice. Returns: - slice of all data items (its length is not known in advance), - error if any occurred. */ Collect() ([]T, error) }
Collector is an entity from which all its values can be acquired as a slice.
Type parameters:
- T - type of the collected values.
type Consumer ¶
type Consumer[T any] interface { /* SetSource sets some producer as a source of the values for the Consumer. Parameters: - s - Producer to be set as a source. Returns: - error if any occurred. */ SetSource(s Producer[T]) error /* CanAddSource signals if the Consumer is able to accept more Producers. Returns: - true if the source can be set at the moment, false otherwise. */ CanSetSource() bool /* Consume consumes the net value from the Producer. Returns: - value - the value of the item, - valid - true if the value is present, false otherwise, - err - error, if any occurred. */ Consume() (T, bool, error) }
Producer is a data stream consuming data. A Producer can be attached to it via pipe.
Type parameters:
- T - type of the consumed values.
type DefaultClosable ¶
type DefaultClosable struct {
// contains filtered or unexported fields
}
DefaultClosable is a default implementation of the closing mechanism. It is a simple closed flag which can be set to true or checked. To use it, just include the stuct as promoted field.
Implements:
- Closable
func (*DefaultClosable) Close ¶
func (ego *DefaultClosable) Close()
func (*DefaultClosable) Closed ¶
func (ego *DefaultClosable) Closed() bool
type DefaultConsumer ¶
type DefaultConsumer[T any] struct { // contains filtered or unexported fields }
DefaultConsumer is a default partial implementation of the Consumer.
Implements:
- Consumer
func (*DefaultConsumer[T]) CanSetSource ¶
func (ego *DefaultConsumer[T]) CanSetSource() bool
func (*DefaultConsumer[T]) Consume ¶
func (ego *DefaultConsumer[T]) Consume() (value T, valid bool, err error)
func (*DefaultConsumer[T]) SetSource ¶
func (ego *DefaultConsumer[T]) SetSource(s Producer[T]) error
type DefaultProducer ¶
type DefaultProducer[T any] struct { // contains filtered or unexported fields }
DefaultProducer is a default partial implementation of the Producer. It does not itself implement Producer interface, the Get method has to be defined by the full implementation. To use it, include the struct as promoted field and initialize it with the provided constructor. All methods work with a pointer to the embedding struct.
Implements:
- Producer (partially)
func NewDefaultProducer ¶
func NewDefaultProducer[T any](p Producer[T]) *DefaultProducer[T]
NewDefaultProducer is a constructor of the DefaultProducer.
Type parameters:
- T - type of the produced values.
Parameters:
- p - full implementation of the Producer (embedding this struct).
Returns:
- pointer to the new DefaultProducer.
func (*DefaultProducer[T]) Collect ¶
func (ego *DefaultProducer[T]) Collect() ([]T, error)
func (*DefaultProducer[T]) ForEach ¶
func (ego *DefaultProducer[T]) ForEach(fn func(T) error) error
func (*DefaultProducer[T]) Pipe ¶
func (ego *DefaultProducer[T]) Pipe(c Consumer[T]) Consumer[T]
func (*DefaultProducer[T]) Read ¶
func (ego *DefaultProducer[T]) Read(dst []T) (int, error)
type Filter ¶
Filter is a stream which can consume values of T from some Producer and produce values of the same type. The produced data is a subset of the consumed data.
Embeds:
- Consumer
- Producer
Type parameters:
- T - type of the consumed and produced values.
type Iterator ¶
type Iterator[T any] interface { /* ForEach executes a given function on an every data item. The function has one parameter, the current element, and returns error if any occurred. If an error occurrs, the rest of iterations is skipped. Parameters: - fn - anonymous function to be executed. Returns: - error if any of the iterations returned error. */ ForEach(fn func(T) error) error }
Iterator is an entity which can be iterated over.
Type parameters:
- T - type of the iterated values.
type Merger ¶
Merger accepts multiple Producers as a source and combines data from all of them into one stream.
Embeds:
- Consumer
- Producer
Type parameters:
- T - type of the consumed and produced values.
func NewActiveMerger ¶
NewActiveMerger creates new activeMerger. ActiveMerger is merger which actively in round-robin style polls attached sources (producers) in it's get. Beware that if attached source is not channeled then new goroutine is spawned to push data through channel the merger can select on.
Type parameters:
- T - type of the consumed and produced values.
Parameters:
- autoclose - if true, the stream closes automatically when all attached streams close.
Returns:
- pointer to the new merger.
func NewChanneledLazyMerger ¶
NewChanneledLazyMerger is a constructor of the channeled merger. Channeled merger is a merger implementation based on ChanneledInput.
Type parameters:
- T - type of the consumed and produced values.
Parameters:
- capacity - size of the channel buffer,
- autoclose - if true, the stream closes automatically when all attached streams close.
Returns:
- pointer to the new merger.
type Multiplexer ¶
type Multiplexer[T any] interface { Consumer[T] /* Out acquires one of the cloned Producers. Parameters: - index - index of the desired Producer. Returns: - the corresponding producer, nil if the index does not exist. */ Out(index int) Producer[T] }
Multiplexer multiplies the attached producer. The cloned streams can be consumed separately.
Embeds:
- Consumer
Type parameters:
- T - type of the consumed and produced values.
func NewMultiplexer ¶
func NewMultiplexer[T any](capacity int, branches int) Multiplexer[T]
NewMultiplexer is a constructor of the multiplexer.
Type parameters:
- T - type of the consumed and produced values.
Parameters:
- capacity - size of the channel buffer,
- branches - number of the output streams.
Returns:
- pointer to the new multiplexer.
type Producer ¶
type Producer[T any] interface { Closable Reader[T] Collector[T] Iterator[T] /* Get acquires a next item from the stream. Returns: - value - the value of the item, - valid - true if the value is present, false otherwise, - err - error, if any occurred. */ Get() (T, bool, error) /* Pipe attaches the producer to the given consumer. Parameters: - c - the destination consumer. Returns: - the destination consumer. */ Pipe(c Consumer[T]) Consumer[T] }
Producer is a data stream producing data. Can be connected to a Consumer via pipe.
Embeds:
- Closable
- Reader
- Collector
- Iterable
Type parameters:
- T - type of the produced values.
type Reader ¶
type Reader[T any] interface { /* Read reads values from the entity into the given slice. Parameters: - dst - destination slice. Returns: - count of data items actually read (maximum is the length of dst), - error if any occurred. */ Read(dst []T) (int, error) }
Reader is an entity which can be read from.
Type parameters:
- T - type of the read values.
type Splitter ¶
type Splitter[T any] interface { Consumer[T] /* Cond acquires one of the Producers corresponding the the concrete condition. Parameters: - index - index of the desired Producer. Returns: - the corresponding producer, nil if the index does not exist. */ Cond(index int) Producer[T] /* Default acquires the Producer containing data not satisfying any of the conditions. Returns: - default Producer. */ Default() Producer[T] }
Splitter splits the attached producer into multiple branches due to some conditions. If the data do not satisfy any of the conditions, it is written to the default branch. Each input value from the input Producer goes to exactly one output. The output streams can be consumed separately.
Embeds:
- Consumer
Type parameters:
- T - type of the consumed and produced values.
func NewSplitter ¶
NewSplitter is a constructor of the splitter.
Type parameters:
- T - type of the consumed and produced values.
Parameters:
- capacity - size of the channel buffer,
- fn - any amount of conditional functions.
Returns:
- pointer to the new splitter.
type Transformer ¶
Transformer is a stream which can consume values of T from some Producer and produce values of U.
Embeds:
- Consumer
- Producer
Type parameters:
- T - type of the consumed values,
- U - type of the produced values.
func NewTransformer ¶
func NewTransformer[T, U any](fn func(T) U) Transformer[T, U]
NewTransformer is a constructor of the transformer.
Type parameters:
- T - type of the consumed values,
- U - type of the produced values.
Parameters:
- fn - transform function.
Returns:
- pointer to the new transformer.
type Writer ¶
type Writer[T any] interface { /* Write writes the given values to the entity. Parameters: - values - any amount of data items to write. Returns: - count of data items actually written, - error if any occurred. */ Write(values ...T) (int, error) }
Writer is an entity which can be written to.
Type parameters:
- T - type of the written values.