conc

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

README

panyam/goutils/conc

The conc package within panyam/goutils implements a few common concurrency patterns with more customizable behavior. This was inspired by and formalized from Rob Pike's original lecture - https://golang.ir/talks/2012/concurrency.slide.

Examples

Reader

A basic goroutine wrapper over a "reader" function that continuosly calls the reader function and sends read results into a channel.

Writer

Just like the reader but for serializing writes using a writer callback method.

Mapper

A goroutine reads from an input channel, transforms (and/or filters) it and writes the result to an output channel.

Reducer

A goroutine collects and reduces N values from an input channel, transforms it and writes the result to an output channel.

Pipe

A goroutine that connects a reader and a writer channel - a Mapper with the identity transform.

Fan-In

Implementation of the fan-in pattern where multiple receiver channels can be fed into a target channel.

Fan-Out

Implementation of the fan-out pattern where output from a single channel is fanned-out to multiple channels.

Documentation

Overview

The conc package contains a few utilities for basic concurrency patterns that have wide uses.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func IDFunc added in v0.0.19

func IDFunc[T any](input T) T

Types

type FanIn

type FanIn[T any] struct {
	RunnerBase[fanInCmd[T]]
	// Called when a channel is removed so the caller can
	// perform other cleanups etc based on this
	OnChannelRemoved func(fi *FanIn[T], inchan <-chan T)
	// contains filtered or unexported fields
}
Example
// Create 5 input channels and send 5 numbers into them
// the collector channel
fanin := NewFanIn[int](nil)
defer fanin.Stop()

NUM_CHANS := 2
NUM_MSGS := 3

var inchans []chan int
for i := 0; i < NUM_CHANS; i++ {
	inchan := make(chan int)
	inchans = append(inchans, inchan)
	fanin.Add(inchan)
}

for i := 0; i < NUM_CHANS; i++ {
	go func(inchan chan int) {
		// send some  numbers into this fanin
		for j := 0; j < NUM_MSGS; j++ {
			inchan <- j
		}
	}(inchans[i])
}

// collect the fanned values
var vals []int
for i := 0; i < NUM_CHANS*NUM_MSGS; i++ {
	val := <-fanin.RecvChan()
	vals = append(vals, val)
}

// sort and print them for testing
sort.Ints(vals)

for _, v := range vals {
	fmt.Println(v)
}
Output:

0
0
1
1
2
2

func NewFanIn added in v0.0.19

func NewFanIn[T any](outChan chan T) *FanIn[T]

func (*FanIn[T]) Add added in v0.0.19

func (fi *FanIn[T]) Add(inputs ...<-chan T)

func (*FanIn[T]) Count added in v0.0.37

func (fi *FanIn[T]) Count() int

func (*FanIn[T]) RecvChan added in v0.0.37

func (fi *FanIn[T]) RecvChan() chan T

func (*FanIn[T]) Remove added in v0.0.19

func (fi *FanIn[T]) Remove(target <-chan T)

Remove an input channel from our monitor list.

type FanOut

type FanOut[T any] struct {
	RunnerBase[fanOutCmd[T]]

	// In the default mode, the Send method simply writes to an input channel that is read
	// by the runner loop of this FanOut.  As soon as an event is read, it by default sequentially
	// writes to all output channels.  If the output channels are not being drained by the reader
	// goroutine (in 2 above) then the Send method will block.
	// In other words, if the reader goroutine is NOT running before the Send method is invoked
	// OR if the reader goroutine is blocked for some reason, then the Send method will block.
	// To prevent this set the async flag to true in the Send method to true so that writes to
	// the reader goroutines are themselves asynchronous and non blocking.
	//
	// By setting this flag to true, writes to th output channels will happen synchronously without
	// invoking a new goroutine.  This will help reduce number of goroutines kicked off during dispatch
	// and is is an optimization if callers/owners of this FanOut want to exercise fine control over the
	// reader channels and goroutines.  For example the caller might create buffered output channels so
	// writes are blocked, or the caller themselves may be running the readers in seperate goroutines
	// to prevent any blocking behavior.
	SendSync bool
	// contains filtered or unexported fields
}

FanOut takes a message from one chanel, applies a mapper function and fans it out to N output channels.

The general pattern is to:

  1. Create a FanOut[T] with the NewFanOut method
  2. Start a reader goroutine that reads values from fanout channels (note this SHOULD be started by any values are sent on the input channel)
  3. Start sending values through the input channel via the Send method.
Example
// Create a fanout wiht 5 output channels and see that
// numbers sent into the output are read from all of these
fanout := NewFanOut[int](nil)
defer fanout.Stop()

NUM_CHANS := 2
NUM_MSGS := 3

// Add some receiver channels
var outchans []chan int
for i := 0; i < NUM_CHANS; i++ {
	outchan := fanout.New(nil)
	outchans = append(outchans, outchan)
}

var vals []int

for i := 0; i < NUM_MSGS; i++ {
	fanout.Send(i)
}

// wait till all fanouts have been collected
for j := 0; j < NUM_MSGS; j++ {
	for i := 0; i < NUM_CHANS; i++ {
		val := <-outchans[i]
		vals = append(vals, val)
	}
}

// sort and print them for testing
sort.Ints(vals)

for _, v := range vals {
	fmt.Println(v)
}
Output:

0
0
1
1
2
2

func NewFanOut

func NewFanOut[T any](inputChan chan T) *FanOut[T]

Creates a new typed FanOut runner. Every FanOut needs an inputChan from which messages can be read to fan out to listening channels. Ths inputChan can be owned by this FanOut or can be provided by the caller. If the input channel is provided then it is not closed when the FanOut runner terminates (or is stoopped).

func (*FanOut[T]) Add added in v0.0.37

func (fo *FanOut[T]) Add(output chan<- T, filter FilterFunc[T], wait bool) (callbackChan chan error)

Adds a new channel to which incoming messages will be fanned out to. These output channels can be either added by the caller or created by this runner. If the output channel was passed, then it wont be closed when this runner finishes (or is stopped). A filter function can also be passed on a per output channel basis that can either transform or filter messages specific to this output channel. For example filters can be used to check permissions for an incoming message wrt to an output channel.

Output channels are added to our list of listeners asynchronously. The wait parameter if set to true will return a channel that can be read from to ensure that this output channel registration is synchronous.

func (*FanOut[T]) Count added in v0.0.37

func (fo *FanOut[T]) Count() int

Returns the number of listening channels currently running.

func (*FanOut[T]) DebugInfo added in v0.0.80

func (fo *FanOut[T]) DebugInfo() any

func (*FanOut[T]) New added in v0.0.19

func (fo *FanOut[T]) New(filter FilterFunc[T]) chan T

Adds a new output channel with an optional filter function that will be managed by this runner.

func (*FanOut[T]) Remove added in v0.0.19

func (fo *FanOut[T]) Remove(output chan<- T, wait bool) (callbackChan chan error)

Removes an output channel from our list of listeners. If the channel was managed/owned by this runner then it will also be closed. Just like the Add method, Removals are asynchronous. This can be made synchronized by passing wait=true.

func (*FanOut[T]) Send

func (fo *FanOut[T]) Send(value T)

Sends a value which will be fanned out. This is a wrapper over sending messages over the input channel returned by SendChan.

func (*FanOut[T]) SendChan added in v0.0.37

func (fo *FanOut[T]) SendChan() <-chan T

Returns the channel on which messages can be sent to this runner to be fanned-out.

type FilterFunc added in v0.0.33

type FilterFunc[T any] func(*T) *T

FanOuts lets a message to be fanned-out to multiple channels. Optionally the message can also be transformed (or filtered) before fanning out to the listeners.

type Map added in v0.0.61

type Map[K comparable, V any] struct {
	// contains filtered or unexported fields
}

A synchronized map with read and update lock capabilities

func NewMap added in v0.0.61

func NewMap[K comparable, V any]() (out *Map[K, V])

Creates a new lockable map

func (*Map[K, V]) Delete added in v0.0.61

func (m *Map[K, V]) Delete(k K)

Locks the map to delete a given key.

func (*Map[K, V]) Get added in v0.0.61

func (m *Map[K, V]) Get(k K) (V, bool)

Locks the map to get the value of a given key.

func (*Map[K, V]) Has added in v0.0.61

func (m *Map[K, V]) Has(k K) bool

Locks the map to check for key membership

func (*Map[K, V]) LDelete added in v0.0.93

func (m *Map[K, V]) LDelete(k K, lock bool)

Deletes the value by a given key. Optionally obtains a write lock if requested. The lock can be skipped if a lock was already obtained over a larger transaction.

func (*Map[K, V]) LGet added in v0.0.93

func (m *Map[K, V]) LGet(k K, lock bool) (V, bool)

Gets the value by a given key. Optionally obtains a read lock if requested. A lock can be skipped if a lock was already obtained over a larger transaction.

func (*Map[K, V]) LHas added in v0.0.93

func (m *Map[K, V]) LHas(k K, lock bool) bool

Check if the map contains an entry by key. Optionally obtains a read lock if requested. A lock can be skipped if a lock was already obtained over a larger transaction.

func (*Map[K, V]) LRange added in v0.0.93

func (m *Map[K, V]) LRange(lock bool, meth func(K, V) bool)

Iterates over the items in this map. Optionally obtains a read lock if requested. The lock can be skipped if a lock was already obtained over a larger transaction.

func (*Map[K, V]) LSet added in v0.0.93

func (m *Map[K, V]) LSet(k K, v V, lock bool)

Sets the value by a given key. Optionally obtains a write lock if requested. The lock can be skipped if a lock was already obtained over a larger transaction.

func (*Map[K, V]) Lock added in v0.0.61

func (m *Map[K, V]) Lock()

Obtains a write lock on the map

func (*Map[K, V]) RLock added in v0.0.61

func (m *Map[K, V]) RLock()

Obtains a read lock on the map

func (*Map[K, V]) RUnlock added in v0.0.61

func (m *Map[K, V]) RUnlock()

Relinquishes a read lock on the map

func (*Map[K, V]) Range added in v0.0.61

func (m *Map[K, V]) Range(meth func(K, V) bool)

Locks the map to range of keys/values in this map

func (*Map[K, V]) Set added in v0.0.61

func (m *Map[K, V]) Set(k K, v V)

Locks the map to set the value of a given key.

func (*Map[K, V]) Unlock added in v0.0.61

func (m *Map[K, V]) Unlock()

Relinquishes a write lock on the map

func (*Map[K, V]) Update added in v0.0.61

func (m *Map[K, V]) Update(actions func(items map[K]V))

Obtains a write lock over the map to perform a list of udpate actions

func (*Map[K, V]) View added in v0.0.61

func (m *Map[K, V]) View(actions func())

Obtains a read lock over the map to perform a list of read actions

type Mapper added in v0.0.99

type Mapper[I any, O any] struct {
	RunnerBase[string]

	// MapFunc is applied to each value in the input channel
	// and returns a tuple of 3 things - outval, skip, stop
	// if skip is false, outval is sent to the output channel
	// if stop is true, then the entire mapper stops processing any further elements.
	// This mechanism can be used inaddition to the Stop method if sequencing this
	// within the elements of input channel is required
	MapFunc func(I) (O, bool, bool)
	OnDone  func(p *Mapper[I, O])
	// contains filtered or unexported fields
}

Mappers connect an input and output channel applying transforms between them

func NewMapper added in v0.0.99

func NewMapper[T any, U any](input <-chan T, output chan<- U, mapper func(T) (U, bool, bool)) *Mapper[T, U]

Creates a new mapper between an input and output channel. The ownership of the channels is by the caller and not the Mapper. Hence they will nto be called when the mapper stops.

func NewPipe added in v0.0.19

func NewPipe[T any](input <-chan T, output chan<- T) *Mapper[T, T]

type Message added in v0.0.37

type Message[T any] struct {
	Value  T
	Error  error
	Source interface{}
}

type Reader added in v0.0.37

type Reader[R any] struct {
	RunnerBase[string]

	Read   ReaderFunc[R]
	OnDone func(r *Reader[R])
	// contains filtered or unexported fields
}

The typed Reader goroutine which calls a Read method to return data over a channel.

func NewReader

func NewReader[R any](read ReaderFunc[R]) *Reader[R]

Creates a new reader instance. Just like time.Ticker, this initializer also starts the Reader loop. It is upto the caller to Stop this reader when done with. Not doing so can risk the reader to run indefinitely.

func (*Reader[R]) DebugInfo added in v0.0.80

func (r *Reader[R]) DebugInfo() any

func (*Reader[R]) RecvChan added in v0.0.37

func (rc *Reader[R]) RecvChan() <-chan Message[R]

Returns the channel onwhich messages can be received.

type ReaderFunc added in v0.0.37

type ReaderFunc[R any] func() (msg R, err error)

Type of the reader method used by the Reader goroutine primitive.

type Reducer added in v0.0.98

type Reducer[T any, U any] struct {
	FlushPeriod time.Duration
	ReduceFunc  func(inputs []T) (outputs U)
	// contains filtered or unexported fields
}

Reducer is a way to collect messages of type T in some kind of window and reduce them to type U. For example this could be used to batch messages into a list every 10 seconds. Alternatively if a time based window is not used a reduction can be invokved manually.

func NewIDReducer added in v0.0.98

func NewIDReducer[T any](inputChan chan T, outputChan chan []T) *Reducer[T, []T]

A Reducer that simply collects events of type T into a list (of type []T)

func NewReducer added in v0.0.98

func NewReducer[T any, U any](inputChan chan T, outputChan chan U) *Reducer[T, U]

The reducer over generic input and output types. The input channel can be provided on which the reducer will read messages. If an input channel is not provided then the reducer will create one (and own its lifecycle). Just like other runners, the Reducer starts as soon as it is created.

func (*Reducer[T, U]) Flush added in v0.0.98

func (fo *Reducer[T, U]) Flush()

func (*Reducer[T, U]) Send added in v0.0.98

func (fo *Reducer[T, U]) Send(value T)

Send a mesasge/value onto this reducer for (eventual) reduction.

func (*Reducer[T, U]) SendChan added in v0.0.98

func (fo *Reducer[T, U]) SendChan() chan<- T

The channel onto which messages can be sent (to be reduced)

func (*Reducer[T, U]) Stop added in v0.0.98

func (fo *Reducer[T, U]) Stop()

Stops the reducer and closes all channels it owns.

type RunnerBase added in v0.0.37

type RunnerBase[C any] struct {
	// contains filtered or unexported fields
}

Base of the Reader and Writer primitives

func NewRunnerBase added in v0.0.37

func NewRunnerBase[C any](stopVal C) RunnerBase[C]

Creates a new base runner - called by the Reader and Writer primitives

func (*RunnerBase[R]) DebugInfo added in v0.0.80

func (r *RunnerBase[R]) DebugInfo() any

Used for returning any debug information.

func (*RunnerBase[C]) IsRunning added in v0.0.37

func (r *RunnerBase[C]) IsRunning() bool

Returns true if currently running otherwise false

func (*RunnerBase[C]) Stop added in v0.0.37

func (r *RunnerBase[C]) Stop() error

This method is called to stop the runner. It is upto the child classes to listen to messages on the control channel and initiate the wind-down and cleanup process.

type Writer added in v0.0.37

type Writer[W any] struct {
	RunnerBase[string]

	Write WriterFunc[W]
	// contains filtered or unexported fields
}

The typed Writer goroutine type which calls the Write method when it serializes it writes.

func NewWriter

func NewWriter[W any](write WriterFunc[W]) *Writer[W]

Creates a new writer instance. Just like time.Ticker, this initializer also starts the Writer loop. It is upto the caller to Stop this writer when done with. Not doing so can risk the writer to run indefinitely.

func (*Writer[W]) DebugInfo added in v0.0.80

func (w *Writer[W]) DebugInfo() any

func (*Writer[W]) Send added in v0.0.37

func (wc *Writer[W]) Send(req W) bool

Sends a message to the Writer. This is a shortcut for sending a message to the underlying channel.

func (*Writer[W]) SendChan added in v0.0.37

func (wc *Writer[W]) SendChan() chan W

Returns the channel on which messages can be sent to the Writer.

type WriterFunc added in v0.0.37

type WriterFunc[W any] func(W) error

Type of the writer method used by the writer goroutine primitive to serialize its writes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL