Documentation ¶
Index ¶
- func IsEven[T constraints.Integer]() func(x T) bool
- func IsOdd[T constraints.Integer](x int) bool
- type CWOptions
- type HWOptions
- type Inner
- type KafkaMessage
- type KafkaSinkOpts
- type MemoryStorage
- type Message
- type Number
- type Operator
- func Array[T, R any](operator Operator[T, R]) Operator[[]T, []R]
- func Average[X Number]() Operator[[]X, X]
- func Branch[T any](t *TTask[T, T]) Operator[T, T]
- func BranchSwitch[T any](t *TTask[T, T], filter func(x T) bool) Operator[T, T]
- func BranchWhere[T any](t *TTask[T, T], filter func(x T) bool) Operator[T, T]
- func Chain[O, T any](t *TTask[O, T]) Operator[O, T]
- func Checkpoint[T, R any](id string, operator Operator[T, R]) Operator[T, R]
- func Concat(separator ...string) Operator[[]string, string]
- func CountingWindow[T any](id string, options CWOptions[T]) Operator[T, []T]
- func Delay[T any](d time.Duration) Operator[T, T]
- func Distinct[T comparable]() Operator[[]T, []T]
- func EachRaw[T, R any](cb func(inner *Inner, x *Message[T]) R) Operator[[]T, []R]
- func Filter[T any](cb func(x T) bool) Operator[T, T]
- func FilterArray[T any](cb func(x T) bool) Operator[[]T, []T]
- func FilterArrayRaw[T any](cb func(inner *Inner, x *Message[T]) bool) Operator[[]T, []T]
- func FilterRaw[T, R any](cb func(inner *Inner, x *Message[T]) bool) Operator[T, T]
- func Find[T any](cb func(x T) bool) Operator[[]T, T]
- func FindRaw[T any](cb func(inner *Inner, x *Message[T]) bool) Operator[[]T, T]
- func FlatArray[T any]() Operator[[][]T, []T]
- func ForEach[T any](cb func(x T)) Operator[[]T, []T]
- func HoppingWindow[T any](id string, options HWOptions[T]) Operator[T, []T]
- func IterateArray[T any]() Operator[[]T, T]
- func KafkaCommit[T any](consumer *kafka.Consumer, logger bool) Operator[KafkaMessage[T], KafkaMessage[T]]
- func Length() Operator[string, int]
- func Map[T, R any](cb func(x T) R) Operator[T, R]
- func MapArray[T, R any](cb func(x T) R) Operator[[]T, []R]
- func MapArrayRaw[T, R any](cb func(inner *Inner, x *Message[T]) R) Operator[[]T, []R]
- func MapRaw[T, R any](cb func(inner *Inner, x *Message[T]) R) Operator[T, R]
- func Max[T Number]() Operator[[]T, T]
- func Min[T Number]() Operator[[]T, T]
- func Multiply[T Number]() Operator[[]T, T]
- func Parallelize[T any](n int) Operator[T, T]
- func ParallelizeArray[T any]() Operator[[]T, T]
- func Print[T any](prefix ...string) Operator[T, T]
- func PrintRaw[T any](prefix ...string) Operator[T, T]
- func ReduceArray[T, R any](base R, reducer func(acc *R, x T) R) Operator[[]T, R]
- func ReduceArrayRaw[T, R any](base R, reducer func(acc *R, inner *Inner, x *Message[T]) R) Operator[[]T, R]
- func SessionWindow[T any](id string, options SWOptions[T]) Operator[T, []T]
- func Sum[T Number]() Operator[[]T, T]
- func Tap[T any](cb func(x T)) Operator[T, T]
- func TapRaw[T any](cb func(inner *Inner, x *Message[T])) Operator[T, T]
- func ToFile(path string, separator ...string) Operator[string, string]
- func ToFloat32[T Number]() Operator[T, float32]
- func ToFloat64[T Number]() Operator[T, float64]
- func ToInt[T Number]() Operator[T, int]
- func ToInt16[T Number]() Operator[T, int16]
- func ToInt32[T Number]() Operator[T, int32]
- func ToInt64[T Number]() Operator[T, int32]
- func ToInt8[T Number]() Operator[T, int8]
- func ToKafka[T any](producer *kafka.Producer, topic string, toBytes func(x T) []byte, ...) Operator[T, T]
- func ToStderr[T any](toString func(x T) string) Operator[T, T]
- func ToStderrln[T any](toString func(x T) string) Operator[T, T]
- func ToStdout[T any](toString func(x T) string) Operator[T, T]
- func ToStdoutln[T any](toString func(x T) string) Operator[T, T]
- func ToWriter[T any](w io.Writer, toBytes func(x T) []byte) Operator[T, T]
- func ToWriterCount[T any](w io.Writer, toBytes func(x T) []byte) Operator[T, int]
- func TumblingWindow[T any](id string, options TWOptions[T]) Operator[T, []T]
- func WithContextValue[T any](k any, ext func(x T) any) Operator[T, T]
- func WithCustomKey[T any](extractor func(x T) string) Operator[T, T]
- func WithEventTime[T any](extractor func(x T) time.Time) Operator[T, T]
- type RediStorage
- type SWOptions
- type Step
- type Storage
- type TTask
- func FromArray[T any](taskId string, array []T) *TTask[any, T]
- func FromFile(taskId string, path string) *TTask[any, string]
- func FromInterval[T any](taskId string, size time.Duration, max int, generator func(count int) T) *TTask[any, T]
- func FromItem[T any](taskId string, item T) *TTask[any, T]
- func FromKafka(taskId string, consumer *kafka.Consumer, logger bool, timeout ...time.Duration) *TTask[any, KafkaMessage[[]byte]]
- func FromReadChar(taskId string, prompt string) *TTask[any, rune]
- func FromReader(taskId string, r io.Reader, bufSize int) *TTask[any, []byte]
- func FromReadline(taskId string, prompt string) *TTask[any, string]
- func FromStdin(taskId string) *TTask[any, string]
- func FromString(taskId string, string string, step ...int) *TTask[any, string]
- func FromStringSplit(taskId string, string string, delimiter string) *TTask[any, string]
- func Injectable[T any](id string) *TTask[T, T]
- func T[O, T, R any](t *TTask[O, T], operator Operator[T, R]) *TTask[O, R]
- func Task[T any](id string) *TTask[T, T]
- func Via[O, T, R any](t *TTask[O, T], operator Operator[T, R]) *TTask[O, R]
- func (t *TTask[O, T]) Catch(catcher func(i *Inner, e error)) *TTask[O, T]
- func (t *TTask[O, T]) Inject(c context.Context, x O) error
- func (t *TTask[O, T]) InjectRaw(c context.Context, m *Message[O]) error
- func (t TTask[O, T]) IsInjectable() bool
- func (t *TTask[O, T]) Lock() *TTask[O, T]
- func (t *TTask[O, T]) Run(c context.Context) error
- func (t *TTask[O, T]) WithStorage(s Storage) *TTask[O, T]
- type TWOptions
- type WindowingTime
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsEven ¶
func IsEven[T constraints.Integer]() func(x T) bool
Types ¶
type HWOptions ¶
type HWOptions[T any] struct { Size time.Duration Hop time.Duration Watermark time.Duration WindowingTime WindowingTime }
Defaults:
- Size: 1 second
- Hop: 2 seconds
- Watermark: 0
type Inner ¶
Task metadata and methods to be used inside operators.
func (*Inner) Error ¶
Calling this function will cause the Task flow to be interrupted before the next operator. Use decorator to generate a more detailed error: "[dec1] [dec2] ... err.Error()".
Returining immediatelly after calling this funciton is highly suggested in order to avoid unwanted code executions (returned value doesn't matter). If the Catch method of the Task isn't used, the error will be lost.
type KafkaMessage ¶
type KafkaMessage[T any] struct { Key string TopicPartition kafka.TopicPartition Value T }
type KafkaSinkOpts ¶
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
func NewMemoryStorage ¶
func NewMemoryStorage() *MemoryStorage
type Message ¶
type Message[T any] struct { Id string EventTime time.Time InjestionTime time.Time Key string Value T }
Wraps the value that is being processed inside the task. Accessible using the Raw version of operators.
func NewMessage ¶
func (Message[T]) GetInjestionTime ¶
type Number ¶
type Number interface { constraints.Integer | constraints.Float }
type Operator ¶
func Branch ¶
Create an asyncronous branch from the current task using another task. When branching, the parent task and the child task will continue their flow concurrently. Branching a task will cause the child task to be locked as if Lock() method was called. An already locked task can be used as child task when branching. The child task must be an injectable task, otherwise the process will panic.
func BranchSwitch ¶
Similar to the BranchWhere operator, but messages will either pass to the new branch or continue in the current one
func BranchWhere ¶
Similar to the Branch operator, but redirects to the new task only messages that pass the provided filter
func Chain ¶
Chain another task to the current one syncronously. Chaining a locked task will cause the application to panic. The act of chaininga locks the chained task as if Lock() method was called. The child task must be an injectable task, otherwise the process will panic.
func Checkpoint ¶
Turns an opertator into a checkpoint.
If the processing of a message is interrupted before reaching the next checkpoint (windows act as checkpoints) or before the last task operation is exectuted, the next task execution will recover the checkpointed messages.
The recovery procedure will start when a new message reach the checkpoint.
DON'T USE if you have more than one replica of the same task and you're using a remote storage like Redis, unless you find a way to set a different task id for each replica and you're able to retain it on each restart.
USELESS if your using the in-memory storage or if you're using a local storage you're running the task in a volatile resource (e.g. K8s pods)
func CountingWindow ¶
Counting Window:
...1....2.........3...........4...5......6........7....8....
..[----------------].........[------------]......[----------
func Distinct ¶
func Distinct[T comparable]() Operator[[]T, []T]
Remove duplicates from a slice of comparable elements
func FilterArray ¶
func FilterArrayRaw ¶
func IterateArray ¶
Continue the task execution for each element of the array synchronously
func KafkaCommit ¶
func KafkaCommit[T any](consumer *kafka.Consumer, logger bool) Operator[KafkaMessage[T], KafkaMessage[T]]
Perform a commit on the current kafka message.
func MapArrayRaw ¶
func MapRaw ¶
Map the message value (with access to task metadata and message metadata). Also allows to create custom operators.
func Parallelize ¶
Process n messages in parallel using an in-memory buffer
func ParallelizeArray ¶
Continue the task exection for each element of the array asynchronously
func ReduceArray ¶
JS-like array reducer
func ReduceArrayRaw ¶
func ReduceArrayRaw[T, R any](base R, reducer func(acc *R, inner *Inner, x *Message[T]) R) Operator[[]T, R]
JS-like array reducer [raw version]
func TapRaw ¶
Perform an action for the message (with access to task metadata and message metadata).
func ToKafka ¶
func ToKafka[T any](producer *kafka.Producer, topic string, toBytes func(x T) []byte, options KafkaSinkOpts) Operator[T, T]
Sink: send the message to a kafka topic
func ToStderr ¶
Sink: Print to the standard error. Use toString to temporarly transform the message into a string.
func ToStderrln ¶
Sink: Print to the standard error appending with a new line char. Use toString to temporarly transform the message into a string.
func ToStdout ¶
Sink: Print to the standard output. Use toString to temporarly transform the message into a string.
func ToStdoutln ¶
Sink: Print to the standard output appending with a new line char. Use toString to temporarly transform the message into a string.
func ToWriter ¶
Sink: write bytes to a writer. Use toBytes to temporarly transform the message into bytes.
func ToWriterCount ¶
Sink: write message to a writer. Next message value will be the number of written bytes. Use toBytes to transform the message into bytes.
func TumblingWindow ¶
TumblingWindow:
..0....1....2....3.........4.........5....6....7...
[-------------][-------------][-------------][-----
func WithContextValue ¶
Cache a key/value record in the Task context. Use an extractor function to pull the value from the processed item.
func WithCustomKey ¶
Set a custom message key from the message itself.
type RediStorage ¶
type RediStorage struct {
// contains filtered or unexported fields
}
func NewRedisStorage ¶
func NewRedisStorage(client *redis.Client) *RediStorage
type SWOptions ¶
type SWOptions[T any] struct { MaxSize time.Duration MaxInactivity time.Duration Watermark time.Duration WindowingTime WindowingTime }
Defaults:
- Storage: memory (no persistence)
- Id: random uuid
- MaxInactivity: 1 second
- MaxSize: MaxInactivity * 2
type TTask ¶
type TTask[O, T any] struct { // contains filtered or unexported fields }
func FromInterval ¶
func FromInterval[T any](taskId string, size time.Duration, max int, generator func(count int) T) *TTask[any, T]
Source: trigger a task execution at a given interval. Generator function will produce the message, optionally using the interval counter. A max of 0 will generate and endless interval.
func FromKafka ¶
func FromKafka(taskId string, consumer *kafka.Consumer, logger bool, timeout ...time.Duration) *TTask[any, KafkaMessage[[]byte]]
Source: trigger a Task execution for each received message.
func FromString ¶
Source: trigger a Task execution for each char of a string (or for each substring with a given step).
func FromStringSplit ¶
Source: trigger a Task execution for each substring, given a certain delimiter.
func Injectable ¶
Initialize an injectable Task with the first step message type as generic. To push messages to this Task use the Inject method.
func (TTask[O, T]) IsInjectable ¶
func (*TTask[O, T]) Lock ¶
Lock the task to prevent it from being further extended with more operators.
func (*TTask[O, T]) Run ¶
Exec the first step of a Task (and other steps cascading). Use this when not manually injecting items in the Task. This method also lock the task
func (*TTask[O, T]) WithStorage ¶
type TWOptions ¶
type TWOptions[T any] struct { Size time.Duration Watermark time.Duration WindowingTime WindowingTime }
Defaults:
- Storage: memory (no persistence)
- Id: random uuid
- Size: 1 second
- Hop: 2 seconds
type WindowingTime ¶
type WindowingTime string
const ( EventTime WindowingTime = "event-time" InjestionTime WindowingTime = "injestion-time" ProcessingTime WindowingTime = "processing-time" )
Source Files ¶
- checkpoint.go
- operators_array.go
- operators_base.go
- operators_casting.go
- operators_context.go
- operators_flow.go
- operators_kafka.go
- operators_numbers.go
- operators_string.go
- sinks_file.go
- sinks_kafka.go
- sinks_writer.go
- sources_file.go
- sources_kafka.go
- sources_reader.go
- sources_static.go
- sources_user_input.go
- storage.go
- storage_memory.go
- storage_redis.go
- storage_wrapper.go
- task.go
- task_inner.go
- task_message.go
- task_runners.go
- types.go
- utils_file.go
- window_counting.go
- window_hopping.go
- window_inactivity_check.go
- window_session.go
- window_tumbling.go
- windows_options.go
- windows_utils.go