eventual2go

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2023 License: LGPL-3.0 Imports: 6 Imported by: 19

README

Build Status

eventual2go

Overview

A package for event-driven programming in Go.

Features:

  • Streams
  • Futures
  • Reactor
  • code generation for typed events

Installation

Simply run

go get github.com/joernweissenborn/eventual2go

Getting Started

See https://godoc.org/github.com/joernweissenborn/eventual2go

Documentation

Overview

Package eventual2go is a library for event driven development. It provides implementations for futures and streams, as can be found in many modern languages.

eventual2go deals with cases, where you are in event-driven enviroment and want to react to something from which you don't now when its gonna happen.

Events can either be unique or not. For example, the result of reading in a large file is unique event, whereas a GET request on a webserver is not. eventual2go provides Futures for the former, Streams for the latter case.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrTimeout = errors.New("Timeout")

ErrTimeout represents a timeout error

Functions

This section is empty.

Types

type Actor

type Actor interface {
	Init() error
	OnMessage(d Data)
}

Actor is a simple actor.

type ActorMessageStream

type ActorMessageStream struct {
	// contains filtered or unexported fields
}

ActorMessageStream is used to send messages to an actor.

func SpawnActor

func SpawnActor(a Actor) (messages ActorMessageStream, err error)

SpawnActor creates an actor and returns a message stream to it.

func (ActorMessageStream) Send

func (ams ActorMessageStream) Send(data Data)

Send sends a message to an actor.

func (ActorMessageStream) Shutdown

func (ams ActorMessageStream) Shutdown(data Data) (err error)

Shutdown sends a shutdown signal to the actor. Messages send before the shutdown signal are guaranteed to be handled.

type Collector

type Collector struct {
	// contains filtered or unexported fields
}

Collector is a data sink. Use it to collect events for later retrieval. All events are stored in historical order.

func NewCollector

func NewCollector() (c *Collector)

NewCollector creates a new Collector.

func (*Collector) Add

func (c *Collector) Add(d Data)

Add adds data to the collector.

func (*Collector) AddFuture

func (c *Collector) AddFuture(f *Future)

AddFuture collects the result of a `Future`

func (*Collector) AddFutureError

func (c *Collector) AddFutureError(f *Future)

AddFutureError collects the error of a `Future`

func (*Collector) AddObservable

func (c *Collector) AddObservable(o *Observable)

AddObservable collects all changes off an `Observable`

func (*Collector) AddStream

func (c *Collector) AddStream(s *Stream)

AddStream collects all events on `Stream`

func (*Collector) Empty

func (c *Collector) Empty() (e bool)

Empty returns true if at least one data element is stored in the collector.

func (*Collector) Get

func (c *Collector) Get() (d Data)

Get retrieves the oldes data from the collecter and deletes it from it.

func (*Collector) Preview

func (c *Collector) Preview() (d Data)

Preview retrieves the oldes data from the collecter without deleting it from it.

func (*Collector) Size

func (c *Collector) Size() (n int)

Size returns the number of events stored in the collector.

func (*Collector) Stop

func (c *Collector) Stop()

Stop stops the collection on events.

func (*Collector) Stopped

func (c *Collector) Stopped() *Future

Stopped returns a future which completes when the reactor collected all events before the call to `Stop`.

type Completer

type Completer struct {
	// contains filtered or unexported fields
}

Completer is thread-safe struct that can be completed with arbitrary data or failed with an error. Handler functions can be registered for both events and get invoked after completion..

func NewCompleter

func NewCompleter() (c *Completer)

NewCompleter creates a new Completer.

func NewTimeoutCompleter

func NewTimeoutCompleter(d time.Duration) (c *Completer)

NewTimeoutCompleter creates a new Completer, which error completes after the specified duration, if Completer hasnt been completed otherwise.

func (*Completer) Complete

func (c *Completer) Complete(d Data)

Complete completes the Completer with the given data and triggers all registered completion handlers. Panics if the Completer is already complete.

func (*Completer) CompleteError

func (c *Completer) CompleteError(err error)

CompleteError completes the Completer with the given error and triggers all registered error handlers. Panics if the Completer is already complete.

func (*Completer) CompleteOn

func (c *Completer) CompleteOn(f CompletionFunc)

CompleteOn invokes a CompletionFunc in a go-routine and either completes with the resut or the error if it is not nil. Don't invoke this function more then once to avoid multiple complition panics.

func (*Completer) CompleteOnFuture

func (c *Completer) CompleteOnFuture(f *Future)

CompleteOnFuture completes the completer with the result or the error of a `Future`.

func (*Completer) Completed

func (c *Completer) Completed() bool

Completed returns the completion state.

func (*Completer) Future

func (c *Completer) Future() (f *Future)

Future returns the associated Future.

type CompletionFunc

type CompletionFunc func() (Data, error)

A CompletionFunc is the argument for Completer.OnChange.

type CompletionHandler

type CompletionHandler func(Data) Data

A CompletionHandler gets invoked when a Future is completed. Returned value gets propagated when chaining futures.

type Data

type Data interface{}

Data represents generic datatype.

type DeriveSubscriber

type DeriveSubscriber func(*StreamController, Data)

A DeriveSubscriber gets invoked every time data is added on the source stream and is responsible for adding the (transformed) data on the sink stream controller.

type ErrorHandler

type ErrorHandler func(error) (Data, error)

An ErrorHandler gets invoked when a Future fails. Returned value and error are propagated when chaining futures, if the error is nil, the chained future will be completed with the data, otherwise it fails.

type Event

type Event struct {
	Classifier interface{}
	Data       Data
}

Event represents a generic classifer assicated with generic data.

type Filter

type Filter func(Data) bool

A Filter gets invoked when data is added to the consumed stream. The data is added to filtered stream conditionally, depending the Filter got registered with Where or WhereNot.

type Future

type Future struct {
	// contains filtered or unexported fields
}

Future is thread-safe struct that can be completed with arbitrary data or failed with an error. Handler functions can be registered for both events and get invoked after completion..

Example

Demonstrates the basic usage of futures

package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/joernweissenborn/eventual2go"
)

func main() {

	// create the completers, one we will complete with error, the other normaly.
	completerNor := eventual2go.NewCompleter()
	completerErr := eventual2go.NewCompleter()

	// set up success handler
	var onsuccess eventual2go.CompletionHandler = func(d eventual2go.Data) eventual2go.Data {
		fmt.Println("SUCESS:", d)
		return "Hello Future Chaining"
	}

	// set up error handler
	var onerror eventual2go.ErrorHandler = func(e error) (eventual2go.Data, error) {
		fmt.Println("ERROR:", e)
		return nil, nil
	}

	// our long running async func
	mylongrunning := func(do_err bool, c *eventual2go.Completer) {
		time.Sleep(1 * time.Second)

		if do_err {
			c.CompleteError(errors.New("Hello Future Error"))
		} else {
			c.Complete("Hello Future")
		}
	}

	// get the futures
	fNor := completerNor.Future()
	fErr := completerErr.Future()

	// register the handlers

	// we chain the succes
	fNor.Then(onsuccess).Then(onsuccess)
	fNor.Err(onerror)
	fErr.Then(onsuccess)
	fErr.Err(onerror)

	// execute the functions
	go mylongrunning(false, completerNor)
	go mylongrunning(true, completerErr)

	// wait for futures to complete
	fNor.WaitUntilComplete()
	fErr.WaitUntilComplete()

	// everything is async, so the future is maybe complete, but the handlers must not have been executed necessarily, so we wait 10 ms
	time.Sleep(10 * time.Millisecond)
}
Output:

func (*Future) AsChan

func (f *Future) AsChan() chan Data

AsChan returns a channel which either will receive the result on completion or gets closed on error completion of the Future.

func (*Future) AsErrChan

func (f *Future) AsErrChan() chan error

AsErrChan returns a channel which either will receive the error on error completion or gets closed on completion of the Future.

func (*Future) Completed

func (f *Future) Completed() bool

Completed returns the completion state.

func (*Future) Err

func (f *Future) Err(eh ErrorHandler) (nf *Future)

Err registers an error handler. If the future is already completed with an error, the handler gets executed immediately. Returns a future that either gets completed with result of the handler or error completed with the error from handler, if not nil.

func (*Future) ErrResult

func (f *Future) ErrResult() error

ErrResult returns the resulting error of the future, nil if called before completion or after non-error completion.

func (*Future) Result

func (f *Future) Result() Data

Result returns the result of the future, nil if called before completion or after error completion.

func (*Future) Then

func (f *Future) Then(ch CompletionHandler) (nf *Future)

Then registers a completion handler. If the future is already complete, the handler gets executed immediately. Returns a future that gets completed with result of the handler.

func (*Future) WaitUntilComplete

func (f *Future) WaitUntilComplete()

WaitUntilComplete blocks until the future is complete.

func (*Future) WaitUntilTimeout

func (f *Future) WaitUntilTimeout(timeout time.Duration) (complete bool)

WaitUntilTimeout blocks until the future is complete or the timeout is reached.

type FutureCache

type FutureCache struct {
	// contains filtered or unexported fields
}

FutureCache is a thread-safe cache for storing futures. It stores data with a userdefined index. Useful e.g. when needing to retrieve the same data for multiple requests from a slow location. The cache is sized and implemented as a ring buffer.

func NewCache

func NewCache(size int) (fc *FutureCache)

NewCache creates a new FutureCache of the given size

func (*FutureCache) Cache

func (fc *FutureCache) Cache(Index int, f *Future)

Cache stores a future with given index.

func (*FutureCache) Cached

func (fc *FutureCache) Cached(index int) (is bool)

Cached indicates if there has already been a Future cached at given index.

func (*FutureCache) Get

func (fc *FutureCache) Get(index int) (f *Future)

Get retrives the future with a given index.

type FutureWaitGroup

type FutureWaitGroup struct {
	// contains filtered or unexported fields
}

func NewFutureWaitGroup

func NewFutureWaitGroup() (fwg *FutureWaitGroup)

func (*FutureWaitGroup) Add

func (fwg *FutureWaitGroup) Add(f *Future)

func (*FutureWaitGroup) Wait

func (fwg *FutureWaitGroup) Wait()

type LoopActor

type LoopActor interface {
	Actor
	Loop() (cont bool)
}

LoopActor is an actor with a loop method which is called repeatedly. Messages are handled in between loop repetitions.

type Observable

type Observable struct {
	// contains filtered or unexported fields
}

Observable is represents a value, which can be updated in a threadsafe and change order preserving manner.

Subscribers can get informed by changes through either callbacks or channels.

func NewObservable

func NewObservable(initial Data) (o *Observable)

NewObservable creates a new Observable with an initial value.

func (*Observable) AsChan

func (o *Observable) AsChan() (c chan Data, cancel *Completer)

AsChan returns a channel on which changes get send.

func (*Observable) Change

func (o *Observable) Change(value Data)

Change changes the value of the observable

func (*Observable) Derive

func (o *Observable) Derive(t Transformer) (do *Observable, cancel *Completer)

Derive returns a new Observable which value will be set by transform function everytime the source gets updated.

func (*Observable) NextChange

func (o *Observable) NextChange() (f *Future)

NextChange returns a Future which gets completed with the next change.

func (*Observable) OnChange

func (o *Observable) OnChange(subscriber Subscriber) (cancel *Completer)

OnChange registers a subscriber for change events.

func (*Observable) Stream

func (o *Observable) Stream() (stream *Stream)

Stream returns a stream of change events.

func (*Observable) Value

func (o *Observable) Value() (value Data)

Value returns the current value of the observable. Threadsafe.

type Reactor

type Reactor struct {
	*sync.Mutex
	// contains filtered or unexported fields
}

Reactor is thread-safe event handler.

func NewReactor

func NewReactor() (r *Reactor)

NewReactor creates a new Reactor.

func (*Reactor) AddFuture

func (r *Reactor) AddFuture(classifier interface{}, f *Future)

AddFuture creates an event with given classifier, which will be fired when the given future completes. The event will not be triggered on error comletion.

func (*Reactor) AddFutureError

func (r *Reactor) AddFutureError(classifier interface{}, f *Future)

AddFutureError acts the same as AddFuture, but registers a handler for the future error.

func (*Reactor) AddObservable

func (r *Reactor) AddObservable(classifier interface{}, o *Observable)

AddObservable fires an event with the given classifier whenever the observable is changed.

func (*Reactor) AddStream

func (r *Reactor) AddStream(classifier interface{}, s *Stream)

AddStream subscribes to a Stream, firing an event with the given classifier for every new element in the stream.

func (*Reactor) CatchCtrlC

func (r *Reactor) CatchCtrlC()

CatchCtrlC starts a goroutine, which initializes reactor shutdown when os.Interrupt is received.

func (*Reactor) CollectEvent

func (r *Reactor) CollectEvent(classifier interface{}, c *Collector)

CollectEvent register the given Collectors Add method as eventhandler for the given classifier.

func (*Reactor) Fire

func (r *Reactor) Fire(classifier interface{}, data Data)

Fire triggers an event, invoking asynchronly the registered subscriber, if any. Events are guaranteed to be handled in the order of arrival.

func (*Reactor) FireEvery

func (r *Reactor) FireEvery(classifier interface{}, data Data, interval time.Duration)

FireEvery fires the given event repeatedly. FireEvery can not be canceled and will run until the reactor is shut down.

func (*Reactor) FireIn

func (r *Reactor) FireIn(classifier interface{}, data Data, duration time.Duration)

FireIn fires the given event after the given duration. FireIn can not be canceled.

func (*Reactor) OnShutdown

func (r *Reactor) OnShutdown(s Subscriber)

OnShutdown registers a custom handler for the shutdown event.

func (*Reactor) React

func (r *Reactor) React(classifier interface{}, handler Subscriber)

React registers a Subscriber as handler for a given event classier. Previously registered handlers for for the given classifier will be overwritten!

func (*Reactor) Shutdown

func (r *Reactor) Shutdown(d Data) (err error)

Shutdown shuts down the reactor, cancelling all go routines and stream subscriptions. The error is to fullfill the `Shutdowner` interface and will always be nil.

func (*Reactor) ShutdownFuture

func (r *Reactor) ShutdownFuture() *Future

ShutdownFuture returns a future which gets completed after the reactor shut down.

type Shutdown

type Shutdown struct {
	// contains filtered or unexported fields
}

Shutdown is a register for `Shutdowner` and is used to orchestrate a concurrent shutdown.

func NewShutdown

func NewShutdown() (sd *Shutdown)

NewShutdown creats a new `Shutdown`.

func (*Shutdown) Do

func (sd *Shutdown) Do(d Data) (errs []error)

Do intiatates the shutdown by concurrently calling the shutdown method on all registered `Shutdowner`. Blocks until all shutdowns have finished.

func (*Shutdown) Register

func (sd *Shutdown) Register(s Shutdowner)

Register registers a `Shutdowner`.

type ShutdownActor

type ShutdownActor interface {
	Actor
	Shutdown(Data) error
}

ShutdownActor is an actor with a Shutdown method, which is called upon actor shutdown.

type ShutdownEvent

type ShutdownEvent struct{}

ShutdownEvent is triggering the reactor Shutdown when fired.

type Shutdowner

type Shutdowner interface {
	Shutdown(d Data) error
}

Shutdowner represents the eventual2go shutdown interface.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

A Stream can be consumed or new streams be derived by registering handler functions.

func (*Stream) AsChan

func (s *Stream) AsChan() (c chan Data, stop *Completer)

AsChan returns a channel where all items will be pushed. Note items while be queued in a fifo since the stream must not block.

func (*Stream) Close

func (s *Stream) Close()

Close closes the Stream and its assigned StreamController.

func (*Stream) CloseOnFuture

func (s *Stream) CloseOnFuture(f *Future)

CloseOnFuture closes the Stream upon completion of Future.

func (*Stream) Closed

func (s *Stream) Closed() (f *Future)

Closed returns a Future which completes upon closing of the Stream.

func (*Stream) Derive

func (s *Stream) Derive(dsr DeriveSubscriber) (ds *Stream)

Derive creates a derived stream from a DeriveSubscriber. Mainly used internally.

func (*Stream) First

func (s *Stream) First() (f *Future)

First returns a future that will be completed with the first element added to the stream.

func (*Stream) FirstWhere

func (s *Stream) FirstWhere(f ...Filter) (fw *Future)

FirstWhere returns a future that will be completed with the first element added to the stream where filter returns TRUE.

func (*Stream) FirstWhereNot

func (s *Stream) FirstWhereNot(f ...Filter) (fw *Future)

FirstWhereNot returns a future that will be completed with the first element added to the stream where filter returns FALSE.

func (*Stream) Listen

func (s *Stream) Listen(sr Subscriber) (stop *Completer)

Listen registers a subscriber. Returns a Completer, which can be used to terminate the subcription.

func (*Stream) ListenNonBlocking

func (s *Stream) ListenNonBlocking(sr Subscriber) (stop *Completer)

ListenNonBlocking is the same as Listen, but the subscriber is not blocking the subcription.

func (*Stream) Split

func (s *Stream) Split(f Filter) (ts *Stream, fs *Stream)

Split returns a stream with all elements where the filter returns TRUE and one where the filter returns FALSE.

func (*Stream) Transform

func (s *Stream) Transform(t Transformer) (ts *Stream)

Transform registers a Transformer function and returns the transformed stream.

func (*Stream) TransformConditional

func (s *Stream) TransformConditional(t TransformerConditional) (ts *Stream)

TransformConditional registers a TransformerConditional function and returns the transformed stream.

func (*Stream) TransformWhere

func (s *Stream) TransformWhere(t Transformer, f ...Filter) (tws *Stream)

TransformWhere transforms only filtered elements.

func (*Stream) Where

func (s *Stream) Where(f ...Filter) (fs *Stream)

Where registers a Filter function and returns the filtered stream. Elements will be added if the Filter returns TRUE.

func (*Stream) WhereNot

func (s *Stream) WhereNot(f ...Filter) (fs *Stream)

WhereNot registers a Filter function and returns the filtered stream. Elements will be added if the Filter returns FALSE.

type StreamController

type StreamController struct {
	// contains filtered or unexported fields
}

A StreamController is Stream where elements can be added manually or other Streams joined in.

func NewStreamController

func NewStreamController() (sc *StreamController)

NewStreamController creates a new StreamController.

func (*StreamController) Add

func (sc *StreamController) Add(d Data)

Add adds an element to the stream.

func (*StreamController) Join

func (sc *StreamController) Join(source *Stream)

Join joins a stream. All elements from the source will be added to the stream

func (*StreamController) JoinFuture

func (sc *StreamController) JoinFuture(f *Future)

JoinFuture joins a future completion event. The result will be added to the stream.

func (*StreamController) Stream

func (sc *StreamController) Stream() *Stream

Stream return the underlying stream.

type Subscriber

type Subscriber func(Data)

A Subscriber gets invoked whenever data is added to the consumed stream.

type Transformer

type Transformer func(Data) Data

A Transformer gets invoked when data is added to the consumed stream. The output gets added to the transformed stream.

type TransformerConditional

type TransformerConditional func(Data) (Data, bool)

A TransformerConditional is like a Transformer, but can filter the data.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL