stream

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2024 License: LGPL-2.1 Imports: 4 Imported by: 4

README

Stream

This package provides lazy generic data streams. Data are not loaded until they are needed, which allows to process a large amount of data with constant space complexity. The data flow is modelled by a so-called pipeline. The processing of the data is initiated by reading from the end of the pipeline (any such end is called a sink). There may be more sinks, as the pipeline can be branched.

Stream is an entity in the pipeline. There are two main kinds of streams:

  • Producer - a stream producing data of a certain type,
  • Consumer - a stream capable of attaching to some producer and consuming data from it.

Some streams are both producer and consumer. Those are called two-sided and are used to alter the flow or the data themselves. There are five of them:

  • Transformer - transforms a value of each item,
  • Filter - discards items not satisfying a certain condition,
  • Multiplexer - creates multiple streams identical to the source stream,
  • Splitter - splits one stream into multiple based on certain conditions,
  • Merger - merges multiple streams into one.

They can be connected together arbitrarily, which creates the pipeline. To move the data to and from the pipeline, inputs and outputs are used. The input loads data from the outside (Golang variable, file, remote API, etc.) and serves as a starting producer. The output serves as a final consumer, exporting the data to an external resource (not required for storing to a variable though, as every producer is readable - will be explained in Usage section).

Example of a pipeline

The flow of the data has to be terminated at some point. Thus each producer can be in two states - open or closed. The stream is closed, when there is no more data to read. The closed state propagates from the start to the end of the pipeline, until the sink is closed, what makes the whole process to end.

Usage

Inputs

Inputs serve as a source of the data (first producer in the pipeline). They can be created by implementing the Producer interface. This library contains one pre-implemented input, ChanneledInput. In this case, the source of the data is a buffered channel of a defined capacity. The data are passed to the stream by Write method. The method can be called multiple times. If the buffer is full, the program waits for some space to be freed. When the writing is done, the stream has to be manually closed.

s := stream.NewChanneledInput[int](3)
s.Write(1, 2, 3)
s.Close()
Reading data

Data can be read from any producer. There are 4 methods usable to do this:

  • Get - acquires a single value together with information whether the value is valid (direct approach),
if value, valid, err := s.Get(); err != nil {
	panic(err)
}
for valid {
	fmt.Println(value)
	if value, valid, err = s.Get(); err != nil {
		panic(err)
	}
}
  • Read - reads up to a concrete amount of values to a given slice (provided by Reader interface),
out := make([]int, 3)
if n, err := s.Read(out); err != nil {
	panic(err)
} else {
	for i := 0; i < n; i++ {
		fmt.Println(out[i])
	}
}
  • Collect - collects all data in the pipeline and returns them as a slice (provided by Collector interface),
if out, err := s.Collect(); err != nil {
	panic(err)
} else {
	for _, value := range out {
		fmt.Println(value)
	}
}
  • ForEach - iterates over all data and executes a given function on each value (provided by Iterable interface).
if err := s.ForEach(func(value int) error {
	_, err := fmt.Println(value)
	return err
}); err != nil {
	panic(err)
}

Note: Read, Collect and ForEach are available only if the producer is not attached to any other consumer (therefore it is a sink).

Transformer

The transformer works as a map method in many programming languages. Each item of the stream is modified by the given transformation function. The output can be of a different type than the input.

t := stream.NewTransformer(func(x int) int {
    return x * x
})
s.Pipe(t)
Filter

The filter simply filters the data by dropping all items not satisfying the given predicate (for which the given function returns false).

f := stream.NewFilter(func(x int) bool {
	return x <= 2
})
s.Pipe(f)
Multiplexer

The multiplexer clones its source stream. It contains multiple channeled inputs that are filled automatically when the stream is attached to a source. The outputs are accessed by calling the Out method.

capacity := 3
branches := 2
m := stream.NewMultiplexer[int](capacity, branches)
s.Pipe(m)
b0 := m.Out(0)
b1 := m.Out(1)
Splitter

The splitter is similar to multiplexer, but each item is written to only one of the nested channeled inputs, depending on which of the given conditions is the first one satisfied by the item's value. If the value does not satisfy any of the conditions, it goes to the default branch. The outputs are accessed by calling the Cond and Default methods.

capacity := 3
ss := stream.NewSplitter[int](capacity, func(x int) bool {
	return x <= 2
})
s.Pipe(ss)
b0 := ss.Cond(0)
b1 := ss.Default()
Merger

The merger merges multiple streams into a single one. It can be configured to close automatically (after closing of all currently attached sources) or manually.

We have two basic implementations: channeledLazyMerger and activeMerger. The channeledLazyMerger contains a buffer which is concurently filled by every attached source (each has its own goroutine trying to push data into the buffer whenever it is possible). On the other hand, the activeMerger is cycling through its sources in round robin style until it hits any with data available (for nonchanelled sources a goroutine is still spawned, see code for details). That is when there is no data to process in the sources and the Get method is called, the activeMerger indefinitelly polls sources while the chanelledLazyMerger waits on its buffer (channel).

autoclose := true
m := stream.NewActiveMerger[int](autoclose)
s1.Pipe(m)
s2.Pipe(m)
Outputs

Output is a sink which exports the data to some destination. Read, Collect and ForEach methods then cannot be used anywhere in the corresponding branch of the pipeline. It has to be implemented by user, this library does not provide any.

Custom streams

The package provides tools to conveniently create your own implementations of all interfaces mentioned above (for more information, check the code documentation). Inputs and outputs are likely to be most common.

Inputs

Custom inputs can be created by implementing the Producer interface. For this purpose, the DefaultProducer struct is available to use. It defines all methods of the producer interface excluding Get which defines the concrete way of acquiring the data. To insert a simple closing mechanism, the DefaultClosable struct can be used. Example:

type RandomIntInput struct {
	stream.DefaultClosable
	stream.DefaultProducer[int]
}

func NewRandomIntInput() *RandomIntInput {
	ego := new(RandomIntInput)
	ego.DefaultProducer = *stream.NewDefaultProducer[int](ego)
	return ego
}

func (ego *RandomIntInput) Get() (value int, valid bool, err error) {
	if ego.Closed() {
		return
	}
	value = rand.Intn(100)
	valid = true
	return
}
Outputs

Custom outputs must implement the Consumer interface. The DefaultConsumer struct is ready to use for this. Example:

type StdoutOutput struct {
	stream.DefaultConsumer[int]
}

func NewStdoutOutput() *StdoutOutput {
	return new(StdoutOutput)
}

func (ego *StdoutOutput) Run() (err error) {
	value, valid, err := ego.Consume()
	for valid && err == nil {
		fmt.Println(value)
		value, valid, err = ego.Consume()
	}
	return
}

Examples

  1. Computes a square of three numbers. The result will be [1, 4, 9].
input := stream.NewChanneledInput[int](3)
transformer := stream.NewTransformer(func(x int) int {
    return x * x
})

input.Write(1, 2, 3)
input.Close()
input.Pipe(transformer)

result, err := transformer.Collect()
  1. Creates a million of numbers and prints them increased by 1. Generating and reading run in parallel, so only one integer is stored in memory at the time (because of the unbuffered channel).
capacity := 0
input := stream.NewChanneledInput[int](capacity)
transformer := stream.NewTransformer(func(x int) int {
	return x + 1
})

var wg sync.WaitGroup
wg.Add(2)

write := func() {
	defer wg.Done()
	defer input.Close()
	for i := 0; i < 1000000; i++ {
		input.Write(i)
	}
}

input.Pipe(transformer)

read := func() {
	defer wg.Done()
	transformer.ForEach(func(x int) error {
		_, err := fmt.Println(x)
		return err
	})
}

go write()
go read()
wg.Wait()

Documentation

Overview

Package stream is a library providing lazy generic data streams for Golang.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChanneledInput

type ChanneledInput[T any] interface {
	ChanneledProducer[T]
	Writer[T]
}

Channeled input is a Producer getting its data from the outside world. The source of the data is a channel which is written to by the programmer. It is closed manually, what causes all attached streams to close also.

Embeds:

  • ChanneledProducer
  • Writer

Type parameters:

  • T - type of the produced values.

func NewChanneledInput

func NewChanneledInput[T any](capacity int) ChanneledInput[T]

NewChanneledInput is a constructor of the channeled input.

Type parameters:

  • T - type of the produced values.

Parameters:

  • capacity - size of the channel buffer.

Returns:

  • pointer to the new channeled input.

type ChanneledProducer

type ChanneledProducer[T any] interface {
	Producer[T]

	/*
			Channel acquires the underlying chan T.

			Returns:
		      - underlying channel.
	*/
	Channel() chan T
}

ChanneledProducer is a Producer whose data are drawn from a channel. We may use Get, or work directly with the Channel.

Embeds:

  • Producer

Type parameters:

  • T - type of the produced values.

type Closable

type Closable interface {

	/*
		Closed checks whether the entity is closed or not.

		Returns:
		  - true if the entity is closed, false otherwise.
	*/
	Closed() bool

	/*
		Close closes the entity.
	*/
	Close()
}

Closable is an entity which can be closed.

type Collector

type Collector[T any] interface {
	/*
		Collect reads all values from the entity and returns them as a slice.

		Returns:
		  - slice of all data items (its length is not known in advance),
		  - error if any occurred.
	*/
	Collect() ([]T, error)
}

Collector is an entity from which all its values can be acquired as a slice.

Type parameters:

  • T - type of the collected values.

type Consumer

type Consumer[T any] interface {

	/*
		SetSource sets some producer as a source of the values for the Consumer.

		Parameters:
		  - s - Producer to be set as a source.

		Returns:
		  - error if any occurred.
	*/
	SetSource(s Producer[T]) error

	/*
		CanAddSource signals if the Consumer is able to accept more Producers.

		Returns:
		  - true if the source can be set at the moment, false otherwise.
	*/
	CanSetSource() bool

	/*
		Consume consumes the net value from the Producer.

		Returns:
		  - value - the value of the item,
		  - valid - true if the value is present, false otherwise,
		  - err - error, if any occurred.
	*/
	Consume() (T, bool, error)
}

Producer is a data stream consuming data. A Producer can be attached to it via pipe.

Type parameters:

  • T - type of the consumed values.

type DefaultClosable

type DefaultClosable struct {
	// contains filtered or unexported fields
}

DefaultClosable is a default implementation of the closing mechanism. It is a simple closed flag which can be set to true or checked. To use it, just include the stuct as promoted field.

Implements:

  • Closable

func (*DefaultClosable) Close

func (ego *DefaultClosable) Close()

func (*DefaultClosable) Closed

func (ego *DefaultClosable) Closed() bool

type DefaultConsumer

type DefaultConsumer[T any] struct {
	// contains filtered or unexported fields
}

DefaultConsumer is a default partial implementation of the Consumer.

Implements:

  • Consumer

func (*DefaultConsumer[T]) CanSetSource

func (ego *DefaultConsumer[T]) CanSetSource() bool

func (*DefaultConsumer[T]) Consume

func (ego *DefaultConsumer[T]) Consume() (value T, valid bool, err error)

func (*DefaultConsumer[T]) SetSource

func (ego *DefaultConsumer[T]) SetSource(s Producer[T]) error

type DefaultProducer

type DefaultProducer[T any] struct {
	// contains filtered or unexported fields
}

DefaultProducer is a default partial implementation of the Producer. It does not itself implement Producer interface, the Get method has to be defined by the full implementation. To use it, include the struct as promoted field and initialize it with the provided constructor. All methods work with a pointer to the embedding struct.

Implements:

  • Producer (partially)

func NewDefaultProducer

func NewDefaultProducer[T any](p Producer[T]) *DefaultProducer[T]

NewDefaultProducer is a constructor of the DefaultProducer.

Type parameters:

  • T - type of the produced values.

Parameters:

  • p - full implementation of the Producer (embedding this struct).

Returns:

  • pointer to the new DefaultProducer.

func (*DefaultProducer[T]) Collect

func (ego *DefaultProducer[T]) Collect() ([]T, error)

func (*DefaultProducer[T]) ForEach

func (ego *DefaultProducer[T]) ForEach(fn func(T) error) error

func (*DefaultProducer[T]) Pipe

func (ego *DefaultProducer[T]) Pipe(c Consumer[T]) Consumer[T]

func (*DefaultProducer[T]) Read

func (ego *DefaultProducer[T]) Read(dst []T) (int, error)

type Filter

type Filter[T any] interface {
	Consumer[T]
	Producer[T]
}

Filter is a stream which can consume values of T from some Producer and produce values of the same type. The produced data is a subset of the consumed data.

Embeds:

  • Consumer
  • Producer

Type parameters:

  • T - type of the consumed and produced values.

func NewFilter

func NewFilter[T any](fn func(T) bool) Filter[T]

NewFilter is a constructor of the filter.

Type parameters:

  • T - type of the consumed and produced values.

Parameters:

  • fn - filter function.

Returns:

  • pointer to the new filter.

type Iterator

type Iterator[T any] interface {
	/*
		ForEach executes a given function on an every data item.
		The function has one parameter, the current element, and returns error if any occurred.
		If an error occurrs, the rest of iterations is skipped.

		Parameters:
		  - fn - anonymous function to be executed.

		Returns:
		  - error if any of the iterations returned error.
	*/
	ForEach(fn func(T) error) error
}

Iterator is an entity which can be iterated over.

Type parameters:

  • T - type of the iterated values.

type Merger

type Merger[T any] interface {
	Consumer[T]
	Producer[T]
}

Merger accepts multiple Producers as a source and combines data from all of them into one stream.

Embeds:

  • Consumer
  • Producer

Type parameters:

  • T - type of the consumed and produced values.

func NewActiveMerger

func NewActiveMerger[T any](autoclose bool) Merger[T]

NewActiveMerger creates new activeMerger. ActiveMerger is merger which actively in round-robin style polls attached sources (producers) in it's get. Beware that if attached source is not channeled then new goroutine is spawned to push data through channel the merger can select on.

Type parameters:

  • T - type of the consumed and produced values.

Parameters:

  • autoclose - if true, the stream closes automatically when all attached streams close.

Returns:

  • pointer to the new merger.

func NewChanneledLazyMerger

func NewChanneledLazyMerger[T any](capacity int, autoclose bool) Merger[T]

NewChanneledLazyMerger is a constructor of the channeled merger. Channeled merger is a merger implementation based on ChanneledInput.

Type parameters:

  • T - type of the consumed and produced values.

Parameters:

  • capacity - size of the channel buffer,
  • autoclose - if true, the stream closes automatically when all attached streams close.

Returns:

  • pointer to the new merger.

type Multiplexer

type Multiplexer[T any] interface {
	Consumer[T]

	/*
		Out acquires one of the cloned Producers.

		Parameters:
		  - index - index of the desired Producer.

		Returns:
		  - the corresponding producer, nil if the index does not exist.
	*/
	Out(index int) Producer[T]
}

Multiplexer multiplies the attached producer. The cloned streams can be consumed separately.

Embeds:

  • Consumer

Type parameters:

  • T - type of the consumed and produced values.

func NewMultiplexer

func NewMultiplexer[T any](capacity int, branches int) Multiplexer[T]

NewMultiplexer is a constructor of the multiplexer.

Type parameters:

  • T - type of the consumed and produced values.

Parameters:

  • capacity - size of the channel buffer,
  • branches - number of the output streams.

Returns:

  • pointer to the new multiplexer.

type Producer

type Producer[T any] interface {
	Closable
	Reader[T]
	Collector[T]
	Iterator[T]

	/*
		Get acquires a next item from the stream.

		Returns:
			- value - the value of the item,
			- valid - true if the value is present, false otherwise,
			- err - error, if any occurred.
	*/
	Get() (T, bool, error)

	/*
		Pipe attaches the producer to the given consumer.

		Parameters:
		  - c - the destination consumer.

		Returns:
		  - the destination consumer.
	*/
	Pipe(c Consumer[T]) Consumer[T]
}

Producer is a data stream producing data. Can be connected to a Consumer via pipe.

Embeds:

  • Closable
  • Reader
  • Collector
  • Iterable

Type parameters:

  • T - type of the produced values.

type Reader

type Reader[T any] interface {
	/*
		Read reads values from the entity into the given slice.

		Parameters:
		  - dst - destination slice.

		Returns:
		  - count of data items actually read (maximum is the length of dst),
		  - error if any occurred.
	*/
	Read(dst []T) (int, error)
}

Reader is an entity which can be read from.

Type parameters:

  • T - type of the read values.

type Splitter

type Splitter[T any] interface {
	Consumer[T]

	/*
		Cond acquires one of the Producers corresponding the the concrete condition.

		Parameters:
		  - index - index of the desired Producer.

		Returns:
		  - the corresponding producer, nil if the index does not exist.
	*/
	Cond(index int) Producer[T]

	/*
		Default acquires the Producer containing data not satisfying any of the conditions.

		Returns:
		  - default Producer.
	*/
	Default() Producer[T]
}

Splitter splits the attached producer into multiple branches due to some conditions. If the data do not satisfy any of the conditions, it is written to the default branch. Each input value from the input Producer goes to exactly one output. The output streams can be consumed separately.

Embeds:

  • Consumer

Type parameters:

  • T - type of the consumed and produced values.

func NewSplitter

func NewSplitter[T any](capacity int, fn ...func(T) bool) Splitter[T]

NewSplitter is a constructor of the splitter.

Type parameters:

  • T - type of the consumed and produced values.

Parameters:

  • capacity - size of the channel buffer,
  • fn - any amount of conditional functions.

Returns:

  • pointer to the new splitter.

type Transformer

type Transformer[T, U any] interface {
	Consumer[T]
	Producer[U]
}

Transformer is a stream which can consume values of T from some Producer and produce values of U.

Embeds:

  • Consumer
  • Producer

Type parameters:

  • T - type of the consumed values,
  • U - type of the produced values.

func NewTransformer

func NewTransformer[T, U any](fn func(T) U) Transformer[T, U]

NewTransformer is a constructor of the transformer.

Type parameters:

  • T - type of the consumed values,
  • U - type of the produced values.

Parameters:

  • fn - transform function.

Returns:

  • pointer to the new transformer.

type Writer

type Writer[T any] interface {
	/*
		Write writes the given values to the entity.

		Parameters:
		  - values - any amount of data items to write.

		Returns:
		  - count of data items actually written,
		  - error if any occurred.
	*/
	Write(values ...T) (int, error)
}

Writer is an entity which can be written to.

Type parameters:

  • T - type of the written values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL