ttask

package module
v0.0.0-...-2a7f7c3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

README

TTask

A stream processing library for Go, heavily inspired by alyxstream.

I'm writing this module for learning purposes

This module is not idiomatic Go. Due to the lack of generics on struct methods (go 1.21.3 - see this issue), I was forced to use a weird pattern to replicate a sort of fluent syntax while mantaining full type safety, hence the name of the module. If a future version of go support this feature, a new version of this module may be written.

Working examples in the examples folder. Run with:

git clone https://github.com/twoojoo/ttask.git
cd ttask
go run examples/<example_path>/main.go

#e.g.
# go run examples/array/main.go
# go run examples/window/tumbling/main.go

Table of contents

Importing

Even if it's not idiomatic, I suggest to import ttask packages in this way, otherwise your code will end up being a bit too verbose (golint will complain).

import (
	. "github.com/twoojoo/ttask"
)

Task

The main abstraction of the library is the Task, which is a set of ordered operations on a stream of data. Tasks can also be chained and branched in custom ways.

There are two types of task:

  • Injectable (messages can be pushed programmatically using the Inject method)
  • based on an autonomous Source, e.g. Kafka consumer, Files, etc.. (using the Inject method here will return an error)
t := T(T(T(
		Injectable[int]("t1"),
		Delay[int](time.Second)),
		Map(func (x int) string {
			return strconv.Itoa(x)
		})),
		Print[string](">"),
	).Catch(func(i *Inner, e error) {
		log.Fatal(e)
	}).Lock()

err := t.Inject(context.Background(), 123)

NOTE: if the operator can't infer the message type from a given callback (e.g. Map operator), the type must be provided as generic to the operator itself (e.g. Print and Delay operators)

Lock method prevents the task from being further extended with more operators. Trying to extend a locked task will cause the application to panic.

Catch method allows graceful error handling.

Sources

TBD

Sinks

TBD

Operators

Sequencial operations on messages that pass through a task are defined by the operators of that task.

Base operators
// Set a custom message key, possibily from the message itself.
func WithCustomKey[T any](extractor func(x T) string) 

// Set a custom message event time, possibly from the message itself.
func WithEventTime[T any](extractor func(x T) time.Time) 

// Print message value with a given prefix.
func Print[T any](prefix ...string) 

// Map the message value.
func Map[T, R any](mapper func(x T) R) 

// Filter messages.
func Filter[T, R any](filter func(x T) bool) 

// Perform an action for the message.
func Tap[T any](action func(x T)) 

// Delay the next task step.
func Delay[T any](d time.Duration) 
Array operators

for array operators, the generic type refers to the type of the elements of the array

func MapArray[T, R any](mapper func(x T) R) 

// Execute an action for each element of the array
func ForEach[T any](action func(x T)) 

func FilterArray[T any](filter func(x T) bool)

// JS-like array reducer
func ReduceArray[T, R any](base R, reducer func(acc *R, x T) R)

// Flattens an array of type [][]T
func FlatArray[T any]()
Context operators
// Cache a key/value record in the Task context. Use an extractor function to pull the value from the processed item.
func WithContextValue[T any](k any, ext func(x T) any) 
Kafka operators
// Perform a commit on the current kafka message.
func KafkaCommit[T any](consumer *kafka.Consumer, logger bool) Operator[types.KafkaMessage[T], types.KafkaMessage[T]]

Windowing

TBD

Tumbling window
Hopping window
Session window
Counting window

Task flow

Chaining/branching tasks is done with some special operators that allow to bisec the task in different ways.

// Chain another task to the current one syncronMapRawously.
// Chaining a locked task will cause the application to panic.
// The act of chaining locks the child task as if Lock() method was called.
// The child task must be an injectable task, otherwise the process will panic.
func Chain[O, T any](t *TTask[O, T]) 

// Create an asyncronous branch from the current task using another task.
// When branching, the parent task and the child task will continue their flow concurrently.
// Branching a task will cause the child task to be locked as if Lock() method was called.
// An already locked task can be used as child task when branching.
// The child task must be an injectable task, otherwise the process will panic.
func Branch[T any](t *TTask[T, T]) 

// Similar to the Branch operator, but redirects to the new task only messages that pass the provided filter
func BranchWhere[T any](t *TTask[T, T], filter func(x T) bool) 

// Similar to the BranchWhere operator, but messages will either pass to the new branch or continue in the current one
func BranchSwitch[T any](t *TTask[T, T], filter func(x T) bool) 

// Process n messages in parallel using an in-memory buffer
func Parallelize[T any](n int)

// Continue the task execution for each element of the array synchronously
func IterateArray[T any]()

// Continue the task exection for each element of the array asynchronously
func ParallelizeArray[T any]()

Raw operators

Most operators have a so called raw version, meaning that it give access to lower level task resources, namely task inner methods and properties and message metadata. For example the Tap operator has its own raw version:

func TapRaw[T any](action func(i *Inner, x *Message[T])) 
Task inner

TBD

Error handling

When the logic of your operator's callback is error prone, it's highly suggested to use the raw version of that operator, since it gives acess to the embedded error handling through the Task inner API.

t := T(T(T(
	Injectable[string]("t1"),
	Delay[string](time.Second)),
	MapRaw(func (i *Inner, x Message[string]) int {
		num, err := strconv.Atoi(x)
		if err != nil {
			i.Error(err)
			return 0
		}

		return num
	})),
	Print[int](">"),
).Catch(func(i *Inner, e error) {
	log.Fatal(e)
}).Lock()

t.Inject(context.Background(), "abc")

Here's what's happpening:

  • since the injected message isn't a numeric string, the Atoi function will return an error
  • the error is passed to the Error function of the task inner API
  • the function returns a 0 (it could be any int value, it will be discarded)
  • the error will be catched by the catcher callback passed to the Catch method of the task
  • the task won't continue to the following operators

if the catcher callback is not set, the error will be raised, but it will be ignored

Custom operators

Custom operators (as well as sources and sinks) can be built starting from the MapRaw operator.

You can find an example here

Next steps

  • fill up the readme file with missing stuff (windows, storage, etc..)
  • better Memory storage with mutexes
  • write a Redis storage for windows and checkpoints
  • write a Rocksdb storage for windows and checkpoints
  • write a Cassandra storage for windows and checkpoints
  • add WithTimeout and WithDeadline context operators
  • add more sources and sinks (e.g. CSV, RabbitMQ, Apache Pulsar, etc..)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsEven

func IsEven[T constraints.Integer]() func(x T) bool

func IsOdd

func IsOdd[T constraints.Integer](x int) bool

Types

type CWOptions

type CWOptions[T any] struct {
	Size          int
	MaxInactivity time.Duration
}

Defaults:

  • Size: 1 (min: 1)
  • MaxIncativity: 0 (no inactivity check)

type HWOptions

type HWOptions[T any] struct {
	Size          time.Duration
	Hop           time.Duration
	Watermark     time.Duration
	WindowingTime WindowingTime
}

Defaults:

  • Size: 1 second
  • Hop: 2 seconds
  • Watermark: 0

type Inner

type Inner struct {
	Context context.Context
	// contains filtered or unexported fields
}

Task metadata and methods to be used inside operators.

func (*Inner) Error

func (inner *Inner) Error(e error, decorators ...any)

Calling this function will cause the Task flow to be interrupted before the next operator. Use decorator to generate a more detailed error: "[dec1] [dec2] ... err.Error()".

Returining immediatelly after calling this funciton is highly suggested in order to avoid unwanted code executions (returned value doesn't matter). If the Catch method of the Task isn't used, the error will be lost.

func (*Inner) ExecNext

func (inner *Inner) ExecNext(x any, next *Step)

Trigger the next Task step. Use this in a raw Operator to handle the Task flow in a custom way (NOT TYPE SAFE)

func (*Inner) TaskID

func (m *Inner) TaskID() string

Return the id of the task

type KafkaMessage

type KafkaMessage[T any] struct {
	Key            string
	TopicPartition kafka.TopicPartition
	Value          T
}

type KafkaSinkOpts

type KafkaSinkOpts struct {
	ContinueOnError    bool
	SkipErrorReporting bool
	Logger             bool
}

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

func NewMemoryStorage

func NewMemoryStorage() *MemoryStorage

type Message

type Message[T any] struct {
	Id            string
	EventTime     time.Time
	InjestionTime time.Time
	Key           string
	Value         T
}

Wraps the value that is being processed inside the task. Accessible using the Raw version of operators.

func NewMessage

func NewMessage[T any](value T) *Message[T]

func (Message[T]) GetID

func (m Message[T]) GetID() string

func (Message[T]) GetInjestionTime

func (m Message[T]) GetInjestionTime() time.Time

type Number

type Number interface {
	constraints.Integer | constraints.Float
}

type Operator

type Operator[T, R any] func(t *Inner, x *Message[T], next *Step)

func Array

func Array[T, R any](operator Operator[T, R]) Operator[[]T, []R]

func Average

func Average[X Number]() Operator[[]X, X]

func Branch

func Branch[T any](t *TTask[T, T]) Operator[T, T]

Create an asyncronous branch from the current task using another task. When branching, the parent task and the child task will continue their flow concurrently. Branching a task will cause the child task to be locked as if Lock() method was called. An already locked task can be used as child task when branching. The child task must be an injectable task, otherwise the process will panic.

func BranchSwitch

func BranchSwitch[T any](t *TTask[T, T], filter func(x T) bool) Operator[T, T]

Similar to the BranchWhere operator, but messages will either pass to the new branch or continue in the current one

func BranchWhere

func BranchWhere[T any](t *TTask[T, T], filter func(x T) bool) Operator[T, T]

Similar to the Branch operator, but redirects to the new task only messages that pass the provided filter

func Chain

func Chain[O, T any](t *TTask[O, T]) Operator[O, T]

Chain another task to the current one syncronously. Chaining a locked task will cause the application to panic. The act of chaininga locks the chained task as if Lock() method was called. The child task must be an injectable task, otherwise the process will panic.

func Checkpoint

func Checkpoint[T, R any](id string, operator Operator[T, R]) Operator[T, R]

Turns an opertator into a checkpoint.

If the processing of a message is interrupted before reaching the next checkpoint (windows act as checkpoints) or before the last task operation is exectuted, the next task execution will recover the checkpointed messages.

The recovery procedure will start when a new message reach the checkpoint.

DON'T USE if you have more than one replica of the same task and you're using a remote storage like Redis, unless you find a way to set a different task id for each replica and you're able to retain it on each restart.

USELESS if your using the in-memory storage or if you're using a local storage you're running the task in a volatile resource (e.g. K8s pods)

func Concat

func Concat(separator ...string) Operator[[]string, string]

func CountingWindow

func CountingWindow[T any](id string, options CWOptions[T]) Operator[T, []T]

Counting Window:

...1....2.........3...........4...5......6........7....8....

..[----------------].........[------------]......[----------

func Delay

func Delay[T any](d time.Duration) Operator[T, T]

Delay the next task step.

func Distinct

func Distinct[T comparable]() Operator[[]T, []T]

Remove duplicates from a slice of comparable elements

func EachRaw

func EachRaw[T, R any](cb func(inner *Inner, x *Message[T]) R) Operator[[]T, []R]

func Filter

func Filter[T any](cb func(x T) bool) Operator[T, T]

Filter messages.

func FilterArray

func FilterArray[T any](cb func(x T) bool) Operator[[]T, []T]

func FilterArrayRaw

func FilterArrayRaw[T any](cb func(inner *Inner, x *Message[T]) bool) Operator[[]T, []T]

func FilterRaw

func FilterRaw[T, R any](cb func(inner *Inner, x *Message[T]) bool) Operator[T, T]

Filter messages (with access to task metadata and message metadata).

func Find

func Find[T any](cb func(x T) bool) Operator[[]T, T]

func FindRaw

func FindRaw[T any](cb func(inner *Inner, x *Message[T]) bool) Operator[[]T, T]

func FlatArray

func FlatArray[T any]() Operator[[][]T, []T]

func ForEach

func ForEach[T any](cb func(x T)) Operator[[]T, []T]

func HoppingWindow

func HoppingWindow[T any](id string, options HWOptions[T]) Operator[T, []T]

func IterateArray

func IterateArray[T any]() Operator[[]T, T]

Continue the task execution for each element of the array synchronously

func KafkaCommit

func KafkaCommit[T any](consumer *kafka.Consumer, logger bool) Operator[KafkaMessage[T], KafkaMessage[T]]

Perform a commit on the current kafka message.

func Length

func Length() Operator[string, int]

func Map

func Map[T, R any](cb func(x T) R) Operator[T, R]

Map the message value.

func MapArray

func MapArray[T, R any](cb func(x T) R) Operator[[]T, []R]

func MapArrayRaw

func MapArrayRaw[T, R any](cb func(inner *Inner, x *Message[T]) R) Operator[[]T, []R]

func MapRaw

func MapRaw[T, R any](cb func(inner *Inner, x *Message[T]) R) Operator[T, R]

Map the message value (with access to task metadata and message metadata). Also allows to create custom operators.

func Max

func Max[T Number]() Operator[[]T, T]

func Min

func Min[T Number]() Operator[[]T, T]

func Multiply

func Multiply[T Number]() Operator[[]T, T]

func Parallelize

func Parallelize[T any](n int) Operator[T, T]

Process n messages in parallel using an in-memory buffer

func ParallelizeArray

func ParallelizeArray[T any]() Operator[[]T, T]

Continue the task exection for each element of the array asynchronously

func Print

func Print[T any](prefix ...string) Operator[T, T]

Print message value with a given prefix.

func PrintRaw

func PrintRaw[T any](prefix ...string) Operator[T, T]

Print message metadata and value with a given prefix.

func ReduceArray

func ReduceArray[T, R any](base R, reducer func(acc *R, x T) R) Operator[[]T, R]

JS-like array reducer

func ReduceArrayRaw

func ReduceArrayRaw[T, R any](base R, reducer func(acc *R, inner *Inner, x *Message[T]) R) Operator[[]T, R]

JS-like array reducer [raw version]

func SessionWindow

func SessionWindow[T any](id string, options SWOptions[T]) Operator[T, []T]

func Sum

func Sum[T Number]() Operator[[]T, T]

func Tap

func Tap[T any](cb func(x T)) Operator[T, T]

Perform an action for the message.

func TapRaw

func TapRaw[T any](cb func(inner *Inner, x *Message[T])) Operator[T, T]

Perform an action for the message (with access to task metadata and message metadata).

func ToFile

func ToFile(path string, separator ...string) Operator[string, string]

Sink: write each Task result to a file unsing a separator (default: \n)

func ToFloat32

func ToFloat32[T Number]() Operator[T, float32]

func ToFloat64

func ToFloat64[T Number]() Operator[T, float64]

func ToInt

func ToInt[T Number]() Operator[T, int]

func ToInt16

func ToInt16[T Number]() Operator[T, int16]

func ToInt32

func ToInt32[T Number]() Operator[T, int32]

func ToInt64

func ToInt64[T Number]() Operator[T, int32]

func ToInt8

func ToInt8[T Number]() Operator[T, int8]

func ToKafka

func ToKafka[T any](producer *kafka.Producer, topic string, toBytes func(x T) []byte, options KafkaSinkOpts) Operator[T, T]

Sink: send the message to a kafka topic

func ToStderr

func ToStderr[T any](toString func(x T) string) Operator[T, T]

Sink: Print to the standard error. Use toString to temporarly transform the message into a string.

func ToStderrln

func ToStderrln[T any](toString func(x T) string) Operator[T, T]

Sink: Print to the standard error appending with a new line char. Use toString to temporarly transform the message into a string.

func ToStdout

func ToStdout[T any](toString func(x T) string) Operator[T, T]

Sink: Print to the standard output. Use toString to temporarly transform the message into a string.

func ToStdoutln

func ToStdoutln[T any](toString func(x T) string) Operator[T, T]

Sink: Print to the standard output appending with a new line char. Use toString to temporarly transform the message into a string.

func ToWriter

func ToWriter[T any](w io.Writer, toBytes func(x T) []byte) Operator[T, T]

Sink: write bytes to a writer. Use toBytes to temporarly transform the message into bytes.

func ToWriterCount

func ToWriterCount[T any](w io.Writer, toBytes func(x T) []byte) Operator[T, int]

Sink: write message to a writer. Next message value will be the number of written bytes. Use toBytes to transform the message into bytes.

func TumblingWindow

func TumblingWindow[T any](id string, options TWOptions[T]) Operator[T, []T]

TumblingWindow:

..0....1....2....3.........4.........5....6....7...

[-------------][-------------][-------------][-----

func WithContextValue

func WithContextValue[T any](k any, ext func(x T) any) Operator[T, T]

Cache a key/value record in the Task context. Use an extractor function to pull the value from the processed item.

func WithCustomKey

func WithCustomKey[T any](extractor func(x T) string) Operator[T, T]

Set a custom message key from the message itself.

func WithEventTime

func WithEventTime[T any](extractor func(x T) time.Time) Operator[T, T]

Set a custom message event time from the message itself.

type RediStorage

type RediStorage struct {
	// contains filtered or unexported fields
}

func NewRedisStorage

func NewRedisStorage(client *redis.Client) *RediStorage

type SWOptions

type SWOptions[T any] struct {
	MaxSize       time.Duration
	MaxInactivity time.Duration
	Watermark     time.Duration
	WindowingTime WindowingTime
}

Defaults:

  • Storage: memory (no persistence)
  • Id: random uuid
  • MaxInactivity: 1 second
  • MaxSize: MaxInactivity * 2

type Step

type Step struct {
	// contains filtered or unexported fields
}

type Storage

type Storage interface {
	// contains filtered or unexported methods
}

type TTask

type TTask[O, T any] struct {
	// contains filtered or unexported fields
}

func FromArray

func FromArray[T any](taskId string, array []T) *TTask[any, T]

Source: trigger a Task execution for each element of the array.

func FromFile

func FromFile(taskId string, path string) *TTask[any, string]

Source: read a file an trigger a Task execution for each line.

func FromInterval

func FromInterval[T any](taskId string, size time.Duration, max int, generator func(count int) T) *TTask[any, T]

Source: trigger a task execution at a given interval. Generator function will produce the message, optionally using the interval counter. A max of 0 will generate and endless interval.

func FromItem

func FromItem[T any](taskId string, item T) *TTask[any, T]

Source: trigger a task once with the give item.

func FromKafka

func FromKafka(taskId string, consumer *kafka.Consumer, logger bool, timeout ...time.Duration) *TTask[any, KafkaMessage[[]byte]]

Source: trigger a Task execution for each received message.

func FromReadChar

func FromReadChar(taskId string, prompt string) *TTask[any, rune]

func FromReader

func FromReader(taskId string, r io.Reader, bufSize int) *TTask[any, []byte]

func FromReadline

func FromReadline(taskId string, prompt string) *TTask[any, string]

func FromStdin

func FromStdin(taskId string) *TTask[any, string]

func FromString

func FromString(taskId string, string string, step ...int) *TTask[any, string]

Source: trigger a Task execution for each char of a string (or for each substring with a given step).

func FromStringSplit

func FromStringSplit(taskId string, string string, delimiter string) *TTask[any, string]

Source: trigger a Task execution for each substring, given a certain delimiter.

func Injectable

func Injectable[T any](id string) *TTask[T, T]

Initialize an injectable Task with the first step message type as generic. To push messages to this Task use the Inject method.

func T

func T[O, T, R any](t *TTask[O, T], operator Operator[T, R]) *TTask[O, R]

Add an operator to the Task. Returns the updated Task.

func Task

func Task[T any](id string) *TTask[T, T]

Use this to build custom sources only. Not an injectable task.

func Via

func Via[O, T, R any](t *TTask[O, T], operator Operator[T, R]) *TTask[O, R]

Add an operator to the Task. Returns the updated Task.

func (*TTask[O, T]) Catch

func (t *TTask[O, T]) Catch(catcher func(i *Inner, e error)) *TTask[O, T]

Catch any error that was raised in the Task with the m.Error function.

func (*TTask[O, T]) Inject

func (t *TTask[O, T]) Inject(c context.Context, x O) error

Push an item to the Task. Use this when not using a task source.

func (*TTask[O, T]) InjectRaw

func (t *TTask[O, T]) InjectRaw(c context.Context, m *Message[O]) error

func (TTask[O, T]) IsInjectable

func (t TTask[O, T]) IsInjectable() bool

func (*TTask[O, T]) Lock

func (t *TTask[O, T]) Lock() *TTask[O, T]

Lock the task to prevent it from being further extended with more operators.

func (*TTask[O, T]) Run

func (t *TTask[O, T]) Run(c context.Context) error

Exec the first step of a Task (and other steps cascading). Use this when not manually injecting items in the Task. This method also lock the task

func (*TTask[O, T]) WithStorage

func (t *TTask[O, T]) WithStorage(s Storage) *TTask[O, T]

type TWOptions

type TWOptions[T any] struct {
	Size          time.Duration
	Watermark     time.Duration
	WindowingTime WindowingTime
}

Defaults:

  • Storage: memory (no persistence)
  • Id: random uuid
  • Size: 1 second
  • Hop: 2 seconds

type WindowingTime

type WindowingTime string
const (
	EventTime      WindowingTime = "event-time"
	InjestionTime  WindowingTime = "injestion-time"
	ProcessingTime WindowingTime = "processing-time"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL