stream

package
v0.0.0-...-5a8c9f9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2016 License: MIT Imports: 11 Imported by: 3

Documentation

Overview

A package for manipulating streams of events, which are basically any pieces of data.

Index

Constants

This section is empty.

Variables

View Source
var DebugLog = true
View Source
var EOI error = errors.New("End of iteration")

An error that represents the end of iteration.

Functions

func Drain

func Drain(s Stream) error

Drain a stream.

This function uses a specific possibly more efficient implementation for streams that define Drain() method.

func Len

func Len(s Stream) (int, error)

Get the length of a stream.

This function uses a specific possibly more efficient implementation for streams that define Len() int method.

func ParseJson

func ParseJson(data []byte) (interface{}, error)

func ParseJsonR

func ParseJsonR(data io.Reader) (interface{}, error)

A better routine for parsing json to an interface{} than a standart one which parses integer numbers as int64, not float64.

func Register

func Register(name string, fn Function)

Register stream function by name for building it from JSON definitions.

func RegisterDecoder

func RegisterDecoder(name string, d Decoder)

Register decoder by name for building it from JSON definitions.

func RegisterDefault

func RegisterDefault()

Register stream functions pre-defined by this library.

func RegisterDefaultDecoders

func RegisterDefaultDecoders()

func RegisterDefaultEncoders

func RegisterDefaultEncoders()

func RegisterEncoder

func RegisterEncoder(name string, e Encoder)

Register encoder by name for building it from JSON definitions.

Types

type Context

type Context map[string]*StreamContext

A context to be used by functions for building a stream from JSON definition.

type Decoder

type Decoder interface {
	Decode(data []byte) (Event, error)
}

Decoder is an interface for decoding event from a byte array.

type Encoder

type Encoder interface {
	Encode(evt Event) ([]byte, error)
}

Encoder is an interface for encoding event to a byte array.

type Event

type Event interface{}

Event interface represents just about anything you want. It only exists for type safety.

type FArg

type FArg interface{}

An argument to a stream function. Might be anythins, only exists for type safety.

type Function

type Function func(Context, []FArg) (Stream, error)

A function for building a stream from JSON definition.

type Stream

type Stream interface {
	Next() (Event, error)
}

A Stream is an interface for any stream of data.

It has only one method for geting next event or an error. An error might be EOI in which case you just need to stop iteration, or any other.

Calling the Next method on a Stream after it already returned an error is unspecified and may be unsafe.

Calling the Next method on a Stream from multiple goroutines at once is unspecified and may be unsafe.

Typical iteration over the steam looks like this:

func iter(s Stream, callback func(Event)) error {
	for {
		evt, err := s.Next()
		if err == EOI {
			break
		}
		if err != nil {
			return err
		}

		callback(evt)
	}
	return nil
}

func And

func And(streams ...Stream) Stream

Takes multiple boolean streams and creates a boolean stream, and-ing events from original streams.

func Chan

func Chan(ch chan Event) Stream

Create a stream from a channel.

func Decode

func Decode(s Stream, d Decoder) Stream

From a stream of []byte events create a stream of decoded events.

func Ema

func Ema(stream Stream, alpha float64) Stream

Get a numbers stream and produce a stream of EMAs of these numbers.

func Empty

func Empty() Stream

Create an empty stream.

func Encode

func Encode(s Stream, e Encoder) Stream

Create a stream of encoded events of type []byte.

func EqVal

func EqVal(stream Stream, evt Event) Stream

Creates a boolean stream with true events when the event of an original stream is equal to a given value and false events otherwise.

func Filter

func Filter(stream Stream, flags Stream) Stream

Takes a data stream and flags stream and produces a stream with events from the data stream for which corresponding flag is true.

func GetField

func GetField(stream Stream, field string) Stream

Creates a stream with events which are values of a field in events of the original stream as in JSON. Returns an error if the original field doesn't exist.

The field might be deep inside, as in "object.value.data".

func Join

func Join(streams ...Stream) Stream

Join multiple streams sequentially in one longer stream.

func LessEqVal

func LessEqVal(stream Stream, val float64) Stream

Creates a boolean stream with true events when the event of an original stream is less or equal to a given value and false events otherwise.

Original stream must consist of numbers.

func LessVal

func LessVal(stream Stream, val float64) Stream

Creates a boolean stream with true events when the event of an original stream is less than a given value and false events otherwise.

Original stream must consist of numbers.

func List

func List(events []Event) Stream

Create a stream from the array of events.

func Map

func Map(stream Stream, fn func(Event) (Event, error)) Stream

Map a function over a stream.

func MaxBy

func MaxBy(datas Stream, vals Stream) Stream

Takes a data stream and numbers stream and produces a stream with event from the data stream for which corresponding value is maximal.

func MinBy

func MinBy(datas Stream, vals Stream) Stream

Takes a data stream and numbers stream and produces a stream with event from the data stream for which corresponding value is minimal.

func MoreEqVal

func MoreEqVal(stream Stream, val float64) Stream

Creates a boolean stream with true events when the event of an original stream is more or equal to a given value and false events otherwise.

Original stream must consist of numbers.

func MoreVal

func MoreVal(stream Stream, val float64) Stream

Creates a boolean stream with true events when the event of an original stream is more than a given value and false events otherwise.

Original stream must consist of numbers.

func NeqVal

func NeqVal(stream Stream, evt Event) Stream

Creates a boolean stream with false events when the event of an original stream is equal to a given value and true events otherwise.

func Or

func Or(streams ...Stream) Stream

Takes multiple boolean streams and creates a boolean stream, or-ing events from original streams.

func Repeat

func Repeat(stream Stream) Stream

Create a stream of one event repeated infinitely.

func RollingMaxBy

func RollingMaxBy(datas Stream, vals Stream) Stream

Takes a data stream and numbers stream and produces a stream with events from the data stream for which corresponding maximal value event changes.

func RollingMaxByAll

func RollingMaxByAll(datas Stream, vals Stream) Stream

Takes a data stream and numbers stream and produces a stream with events from the data stream for which the current value is maximal.

func RollingMinBy

func RollingMinBy(datas Stream, vals Stream) Stream

Takes a data stream and numbers stream and produces a stream with events from the data stream for which corresponding minimal value event changes.

func RollingMinByAll

func RollingMinByAll(datas Stream, vals Stream) Stream

Takes a data stream and numbers stream and produces a stream with events from the data stream for which the current value is minimal.

func Run

func Run(stream Stream, defs []string) (Stream, error)

Build a stream from JSON definition.

func SetField

func SetField(datas Stream, vals Stream, field string) Stream

Creates a stream with events which are events from a first stream with given field set with values from events of the second stream as in JSON.

The field might be deep inside, as in "object.value.data".

func Sprintf

func Sprintf(stream Stream, sfmt string, fields []string) Stream

Format fields of events into string stream.

func StringAppend

func StringAppend(stream Stream, suf string) Stream

Takes a stream af strings and append a given string to all of them.

func StringPrepend

func StringPrepend(stream Stream, pref string) Stream

Takes a stream af strings and prepend a given string to all of them.

func Zip

func Zip(streams ...Stream) Stream

Zip multiple streams into one stream, that will yield arrays of events from all these streams.

The resulting stream will end as soon as the first of these base streams.

type StreamContext

type StreamContext struct {
	// contains filtered or unexported fields
}

A data associated with a stream for building streams from JSON definitions.

Currently, just the multiplexer for stream variables to be used multiple times.

type StreamMultiplexer

type StreamMultiplexer struct {
	// contains filtered or unexported fields
}

func Multiplexer

func Multiplexer(stream Stream) *StreamMultiplexer

Create a multiplexer from a stream.

func (*StreamMultiplexer) New

func (self *StreamMultiplexer) New() Stream

Create a stream that pulls from a base stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL