ops

package
v0.10.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2023 License: Apache-2.0 Imports: 10 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	IntSum = func() Op {
		return NewReduce[int, int](0, func(e1, e2 int) (int, error) {
			return functions.Sum(e1, e2), nil
		})
	}()
)

Functions

This section is empty.

Types

type AbstractOp

type AbstractOp struct {
	// contains filtered or unexported fields
}

func (*AbstractOp) Accept

func (a *AbstractOp) Accept(a2 any) error

func (*AbstractOp) Begin

func (a *AbstractOp) Begin(i int64)

func (*AbstractOp) End

func (a *AbstractOp) End() (any, error)

func (*AbstractOp) Handle

func (a *AbstractOp) Handle(consumer generic.Consumer) (any, error)
func (a *AbstractOp) Link(next Op)

type Comparator

type Comparator[T any] func(T, T) bool

type Concat added in v0.10.3

type Concat[L, R, O any] func(L, R) O

type FilterFunction

type FilterFunction[T any] func(T) (bool, error)

type HashCodeFunc

type HashCodeFunc[T any, R comparable] func(T) (R, error)

type IteratorFunc

type IteratorFunc[T any] func(T) error

type JoinOp added in v0.10.3

type JoinOp interface {
	Op
	SetJoinStream(sinkable join.Sinkable)
}

func InnerJoin added in v0.10.3

func InnerJoin[L, R, O any](on On[L, R], concat Concat[L, R, O]) JoinOp

func LeftJoin added in v0.10.3

func LeftJoin[L, R, O any](on On[L, R], concat Concat[L, R, O], miss Miss[L, O]) JoinOp

type KeyFunc added in v0.0.2

type KeyFunc[T any, R comparable] func(T) (R, error)

type MapFunction

type MapFunction[T, R any] func(T) (R, error)

type Miss added in v0.10.3

type Miss[I, O any] func(I) O

type On added in v0.10.3

type On[L, R any] func(L, R) bool

type Op

type Op interface {
	Begin(int64)
	End() (any, error)
	Accept(any) error
	Handle(consumer generic.Consumer) (any, error)
	Link(next Op)
}

func Filter

func Filter[T any](fn FilterFunction[T]) Op

func Foreach

func Foreach[T any](fn IteratorFunc[T]) Op

func Map

func Map[T, R any](fn MapFunction[T, R]) Op

func NewCollect added in v0.0.2

func NewCollect[T any](c generic.Collect[T]) Op

func NewComparableDistinct

func NewComparableDistinct() Op

func NewDistinct

func NewDistinct[T any, R comparable](hashcode HashCodeFunc[T, R]) Op

func NewGroup added in v0.0.2

func NewGroup[T any, R comparable](keyFunc KeyFunc[T, R]) Op

func NewHead

func NewHead(spl generic.Splittable) Op

func NewLimit

func NewLimit(count int64) Op

func NewReduce

func NewReduce[T, R any](init R, fn ReduceFunction[T, R]) Op

func NewSkip

func NewSkip(offset int64) Op

func NewSort

func NewSort[T any](fn func(T, T) bool) Op

func NewTail added in v0.10.3

func NewTail() Op

func Parallel

func Parallel(options ...ParallelOption) Op

func Peek added in v0.10.3

func Peek[T any](fn IteratorFunc[T]) Op

func UseFlipWindow added in v0.0.8

func UseFlipWindow[T any](size int) Op

type ParallelOption

type ParallelOption func(config *parallelConfig)

func WithFixedPool

func WithFixedPool(size int) ParallelOption

type ReduceFunction

type ReduceFunction[T, R any] func(R, T) (R, error)

type WorkerStrategy

type WorkerStrategy int

WorkerStrategy define the concurrent model

const (
	// BufferPoolStrategy will trigger one goroutine for all the incoming message from upstream
	BufferPoolStrategy WorkerStrategy = 1
	// FixedPoolStrategy will set an upper size for the worker pool
	// will use a producer-consumer model to deal with message stream, need a hasher to make the stream data more consistent
	// or we can use a work steel method to speed up the consumer
	// todo propagate the parallel flag to downstream and enable all parallel process in all following nodes transparently
	FixedPoolStrategy WorkerStrategy = 2
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL