bigbuff

package module
v1.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2019 License: Apache-2.0 Imports: 8 Imported by: 1

README

go-bigbuff

test coverage: 93.7%

Package bigbuff provides an in-memory Buffer implementation, which is designed to support a Producer-Consumer (one to many) pattern, where each consumer reads the full data set, and has their own offsets, with atomic commits and rollbacks. It also provides a well designed native channel backed Consumer implementation, that provides the same kind of feature set, but with is geared towards fan-in implementations, with the same benefits on the consumer end as provided by the Buffer implementation, also solving channel draining and "zero reads" on producer channel close (at the cost of some performance vs pure channels, of course). There are also some utility helper methods which are exposed as part of this package.

Godoc here: github.com/joeycumines/go-bigbuff

Features:

  • Queue implementations with strong support for context.Context, aiming to guarantee as much data consistency as possible
  • Well defined interfaces designed after the style of core golang libs
  • Handy Range method for iterating, with catch / throw pattern for panics
  • Exposes async util functions to combine contexts and safely perform a conditional wait via sync.Cond (no leaks)
  • Configurable cleanup behavior - control retention based on the length and consumer offsets + the min cycle wait
  • The whole reason why I implemented this in the first place, a producer implementation that is 100% safe to use within a consumer processing loop (unbounded buffer size, e.g. imagine something ranging from a buffered channel but also sending to it, but it's 1 consumer, so as soon as the buffer fills, it will deadlock if it tries a send, and another consumer would break guaranteed in-order processing)
  • Single-channel implementation of the bigbuff.Consumer interface, that is uses reflection + polling to provide the same commit and rollback functionality (retains order, supports any readable channel).
  • Breaking from the mould a little bigbuff.Exclusive allows key based locking and de-bouncing for operations that may return values like (interface{}, error), operating synchronously to keep things simple
  • Useful for limiting concurrent executions bigbuff.Workers is compatible with bigbuff.Exclusive

Documentation

Overview

Package bigbuff implements a one-many ordered queue producer-consumer pattern + utilities.

Index

Examples

Constants

View Source
const (
	// DefaultCleanerCooldown is how long the cleaner will wait between checks of the Buffer by default.
	DefaultCleanerCooldown = time.Millisecond * 10

	// DefaultChannelPollRate is how frequently each waiting Channel.Get should try to receive from the channel (from
	// the first failure/non-receive).
	DefaultChannelPollRate = time.Millisecond
)

Variables

This section is empty.

Functions

func CombineContext

func CombineContext(ctx context.Context, others ...context.Context) context.Context

CombineContext returns a context based on the ctx (first param), that will cancel when ANY of the other provided context values cancel CAUTION this spawns one or more blocking goroutines, if you call this with contexts that don't cancel in the reasonable lifetime of your application you will have a leak

func DefaultCleaner

func DefaultCleaner(size int, offsets []int) int

DefaultCleaner is the Buffer's default cleaner, if there is at least one "active" consumer it returns the lowest offset, defaulting to 0, effectively removing values from the buffer that all consumers have read, note the return value is limited to >= 0 and <= size, active consumers are defined as those registered with offsets >= 0.

func ExponentialRetry added in v1.6.0

func ExponentialRetry(ctx context.Context, rate time.Duration, value func() (interface{}, error)) func() (interface{}, error)

ExponentialRetry implements a simple exponential back off and retry, via closure wrapper, as described on Wikipedia (https://en.wikipedia.org/wiki/Exponential_backoff) supporting context canceling (while waiting / before starting), configurable base rate / slot time which will default to 300ms if rate is <= 0 (SUBJECT TO CHANGE), and the ability to support fatal errors via use of the FatalError error wrapper function provided by this package. Notes: 1. This function will panic if value is nil, but NOT if ctx is nil (the latter is not recommended but the existing implementations have this behavior already). 2. The exit case triggered via use of the FatalError wrapper will include any accompanying result, as well as the unpacked error value (which will always be non-nil since FatalError will panic otherwise). 3. Before each call to value the context error will be checked, and if non-nil will be propagated as-is with a nil result. 4. This implementation uses the math/rand package.

func FatalError added in v1.6.0

func FatalError(err error) error

FatalError wraps a given error to indicate to functions or methods that receive a closure that they should no longer continue to operate (applies to: ExponentialRetry), note that the error type will be transparently and recursively unpacked, for any return values, from said methods or functions, so DO NOT attempt to chain such calls without explicit handling at the top level for each fatal-able operation NOTE calls to this function with a nil err will trigger a panic

func Range

func Range(ctx context.Context, consumer Consumer, fn func(index int, value interface{}) bool) error

Range iterates over the consumer, encapsulating automatic commits and rollbacks, including rollbacks caused by panics, note that the index will be the index in THIS range, starting at 0, and incrementing by one with each call to fn. NOTE: the ctx value will be passed into the consumer.Get as-is.

func WaitCond

func WaitCond(ctx context.Context, cond *sync.Cond, fn func() bool) error

WaitCond performs a conditional wait against a *sync.Cond, waiting until fn returns true, with a inbuilt escape hatch for context cancel. Note that the relevant locker must be locked before this is called. It should also be noted that cond.L.Lock will before a context triggered broadcast, in order to avoid a race condition (i.e. if context is cancelled while fn is being evaluated).

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer is the core implementation, implementing the Producer interface, and providing auxiliary methods for configuration, as well as `NewConsumer`, to instance a new consumer, note that though it is safe to instance via new(bigbuff.Buffer), it must not be copied after first use. It's behavior regarding message retention may be configured via SetCleanerConfig, by default it will un-buffer only messages that have been read by all "active and valid" consumers, given at least one exists, otherwise it will retain messages indefinitely. NOTE: the buffer itself will not be cleared even after close, so the data can still be accessed.

func (*Buffer) CleanerConfig

func (b *Buffer) CleanerConfig() CleanerConfig

CleanerConfig returns the current cleaner config (which has defaults)

func (*Buffer) Close

func (b *Buffer) Close() (err error)

func (*Buffer) Diff added in v1.1.0

func (b *Buffer) Diff(c Consumer) (int, bool)

Diff is provided to facilitate ranging over a buffer via a consumer, and returns the items remaining in the buffer (includes uncommitted), be aware that the value CAN be negative, in the event the consumer fell behind (the cleaner cleared item(s) from the buffer that the consumer hadn't read yet, which by default will never happen, as the default mode is a unbounded buffer). Note it will return (0, false) for any invalid consumers or any not registered on the receiver buffer.

func (*Buffer) Done

func (b *Buffer) Done() <-chan struct{}

func (*Buffer) NewConsumer

func (b *Buffer) NewConsumer() (Consumer, error)

NewConsumer constructs a new consumer instance.

func (*Buffer) Put

func (b *Buffer) Put(ctx context.Context, values ...interface{}) error

func (*Buffer) Range added in v1.1.0

func (b *Buffer) Range(ctx context.Context, c Consumer, fn func(index int, value interface{}) bool) error

Range provides a way to iterate from the start to the end of the buffer, note that it will exit as soon as it reaches the end of the buffer (unlike ranging on a channel), it simply utilizes the package Range + Buffer.Diff.

func (*Buffer) SetCleanerConfig

func (b *Buffer) SetCleanerConfig(config CleanerConfig) error

SetCleanerConfig updates the cleaner config, returning an error if the config was invalid.

func (*Buffer) Size

func (b *Buffer) Size() int

Size returns the length of the buffer

func (*Buffer) Slice

func (b *Buffer) Slice() []interface{}

Slice returns a copy of the internal message buffer (all currently stored in memory). NOTE: this will read-lock the buffer itself, and is accessible even after the buffer is closed.

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel implements Consumer based on data from a channel, note that because it uses reflection and polling internally, it is actually safe to close the input channel without invalid zero value reads (though the Channel itself still needs to be closed, and any Get calls will still be blocked until then).

func NewChannel

func NewChannel(ctx context.Context, pollRate time.Duration, source interface{}) (*Channel, error)

NewChannel constructs a new consumer that implements Consumer, but receives it's data from a channel, which uses reflection to support any readable channel, note that a poll date of zero will use the default, and < 0 is an error.

func (*Channel) Buffer

func (c *Channel) Buffer() []interface{}

Buffer returns any values that were drained from the source but not committed yet, in a new copy of the internal buffer, note that if you are trying to ensure no messages get lost in the void, block until Channel.Done before calling this.

func (*Channel) Close

func (c *Channel) Close() error

Close closes the consumer NOTE that it doesn't close the source channel.

func (*Channel) Commit

func (c *Channel) Commit() error

Commit resets read parts of the buffer, or returns an error, note it will always error after context cancel.

func (*Channel) Done

func (c *Channel) Done() <-chan struct{}

func (*Channel) Get

func (c *Channel) Get(ctx context.Context) (value interface{}, err error)

func (*Channel) Rollback

func (c *Channel) Rollback() error

Rollback will cause following Get calls to read from the start of the buffer, or it will return an error if there is nothing to rollback (there is nothing pending).

type Cleaner

type Cleaner func(size int, offsets []int) int

Cleaner is a callback used to manage the size of a bigbuff.Buffer instance, it will be called when relevant to do so, with the size of the buffer, and the consumer offsets (relative to the buffer), and should return the number of elements from the buffer that should be attempted to be shifted from the buffer.

func FixedBufferCleaner

func FixedBufferCleaner(
	max int,
	target int,
	callback func(notification FixedBufferCleanerNotification),
) Cleaner

FixedBufferCleaner builds a cleaner that will give a buffer a fixed threshold size, which will trigger forced reduction back to a fixed target size, note that if callback is supplied it will be called with the details of the cleanup, in the event that it forces cleanup past the default. This has the effect of causing any consumers that were running behind the target size (in terms of their read position in the buffer) to fail on any further Get calls.

type CleanerConfig

type CleanerConfig struct {
	// Cleaner is used to determine if items are removed from the buffer
	Cleaner Cleaner

	// Cooldown is the minimum time between cleanup cycles
	Cooldown time.Duration
}

CleanerConfig is a configuration for a bigbuff.Buffer, that defines how the size is managed

type Consumer

type Consumer interface {
	io.Closer

	// Done should return a channel that will be closed after internal resources have been freed, after a `Close`
	// call, which may not be explicit. This *may* mean that it blocks on any pending changes, and it *may* also
	// be possible that the consumer will be closed due to external reasons, e.g. connection closing.
	Done() <-chan struct{}

	// Get will get a message from the message buffer, at the current offset, blocking if none are available, or
	// an error if it fails.
	Get(ctx context.Context) (interface{}, error)

	// Commit will save any offset changes, and will return an error if it fails, or if the offset saved is the
	// latest.
	Commit() error

	// Rollback will undo any offset changes, and will return an error if it fails, or if the offset saved is the
	// latest.
	Rollback() error
}

Consumer models a consumer in a producer-consumer pattern, where the resource will be closed at most once.

type Exclusive added in v1.2.1

type Exclusive struct {
	// contains filtered or unexported fields
}

Exclusive provides synchronous de-bouncing of operations that may also return a result or error, with consistent or controlled input via provided closures also supported, and the use of any comparable keys to match on, it provides a guarantee that the actual call that returns a given value will be started AFTER the Call method, so keep that in mind when implementing something using it. You may also use the CallAfter method to delay execution after initialising the key, e.g. to allow the first of many costly operations on a given key a grace period to be grouped with the remaining keys.

func (*Exclusive) Call added in v1.2.1

func (e *Exclusive) Call(key interface{}, value func() (interface{}, error)) (interface{}, error)

Call uses a given key to ensure that the operation that the value callback represents will not be performed concurrently, and in the event that one or more operations are attempted while a given operation is still being performed, these operations will be grouped such that they are debounced to a single call, sharing the output.

Note that this method will panic if the receiver is nil, or the value is nil, but a nil key is allowed.

func (*Exclusive) CallAfter added in v1.2.1

func (e *Exclusive) CallAfter(key interface{}, value func() (interface{}, error), wait time.Duration) (interface{}, error)

CallAfter performs exactly the same operation as the Exclusive.Call method, but with an added wait to allow operations sent through in close succession to be grouped together, note that if wait is <= 0 it will be ignored.

func (*Exclusive) CallAfterAsync added in v1.3.0

func (e *Exclusive) CallAfterAsync(key interface{}, value func() (interface{}, error), wait time.Duration) <-chan *ExclusiveOutcome

CallAfterAsync behaves exactly the same as CallAfter but guarantees order (the value func) for synchronous calls.

Note that the return value will always be closed after being sent the result, and will therefore any additional reads will always receive nil.

func (*Exclusive) CallAsync added in v1.3.0

func (e *Exclusive) CallAsync(key interface{}, value func() (interface{}, error)) <-chan *ExclusiveOutcome

CallAsync behaves exactly the same as Call but guarantees order (the value func) for synchronous calls.

Note that the return value will always be closed after being sent the result, and will therefore any additional reads will always receive nil.

func (*Exclusive) Start added in v1.9.0

func (e *Exclusive) Start(key interface{}, value func() (interface{}, error))

Start is synonymous with a CallAsync that avoids spawning a unnecessary goroutines to wait for results

func (*Exclusive) StartAfter added in v1.9.0

func (e *Exclusive) StartAfter(key interface{}, value func() (interface{}, error), wait time.Duration)

StartAfter is synonymous with a CallAfterAsync that avoids spawning a unnecessary goroutines to wait for results

type ExclusiveOutcome added in v1.3.0

type ExclusiveOutcome struct {
	Result interface{}
	Error  error
}

ExclusiveOutcome is the return value from an async bigbuff.Exclusive call

type FixedBufferCleanerNotification

type FixedBufferCleanerNotification struct {
	Max     int   // Max size before forced cleanup is triggered.
	Target  int   // Target size when force cleanup is triggered.
	Size    int   // Size when cleanup was triggered.
	Offsets []int // Offsets when cleanup was triggered.
	Trim    int   // Trim number returned.
}

FixedBufferCleanerNotification is the context provided to the optional callback provided to the FixedBufferCleaner function.

type Notifier added in v1.7.0

type Notifier struct {
	// contains filtered or unexported fields
}

Notifier is a tool which may be used to facilitate event handling using a fan out pattern, modeling a pattern that is better described as publish-subscribe rather than produce-consume, and tries to be semantically equivalent to implementations using channels guarded via context cancels using select statements.

It sits between the Exclusive and Buffer implementations in terms of behavior, yet it's use case is still distinct. Where Buffer shines when providing multiplexing or fanning out of serializable streams of messages, and Exclusive is explicitly designed to be attached to existing expensive tasks which need to occur as a result of multiple triggers, Notifier targets reactive behavior based on asynchronous operations. It provides basic event handling without any buffering or queuing between the producer and subscriber present in the layer before the actual target channels.

Note that it uses reflect internally, to avoid clients needing to rely on generic interface values.

Example (ContextCancelSubscribe)
var (
	nf          Notifier
	k           = 0
	c           = make(chan string)
	d           = make(chan struct{})
	ctx, cancel = context.WithCancel(context.Background())
)
defer cancel()

nf.SubscribeContext(ctx, k, c)

fmt.Println(`starting blocking publish then waiting a bit...`)
go func() {
	defer close(d)
	fmt.Println(`publish start`)
	nf.Publish(k, `one`)
	fmt.Println(`publish finish`)
}()
time.Sleep(time.Millisecond * 100)

fmt.Println(`canceling context then blocking for publish exit...`)
cancel()
<-d

fmt.Println(`closing publish channel...`)
close(c)
time.Sleep(time.Millisecond * 50)

fmt.Println(`success!`)
Output:

starting blocking publish then waiting a bit...
publish start
canceling context then blocking for publish exit...
publish finish
closing publish channel...
success!
Example (PubSubKeys)
var (
	k1 = `some-key`
	k2 = 100
	c1 = make(chan string)
	c2 = make(chan string)
	c3 = make(chan string)
	wg sync.WaitGroup
	nf Notifier
)
wg.Add(3)
go func() {
	defer wg.Done()
	for v := range c1 {
		fmt.Println("c1 recv:", v)
	}
}()
go func() {
	defer wg.Done()
	for v := range c2 {
		fmt.Println("c2 recv:", v)
	}
}()
go func() {
	defer wg.Done()
	for v := range c3 {
		time.Sleep(time.Millisecond * 100)
		fmt.Println("c3 recv:", v)
	}
}()
nf.Subscribe(k1, c1)
nf.Subscribe(k2, c2)
nf.Subscribe(k1, c3)
nf.Subscribe(k2, c3)

nf.Publish(k1, `one`)
time.Sleep(time.Millisecond * 200)
nf.Publish(k2, `two`)

close(c1)
close(c2)
close(c3)
wg.Wait()
Output:

c1 recv: one
c3 recv: one
c2 recv: two
c3 recv: two

func (*Notifier) Publish added in v1.7.0

func (n *Notifier) Publish(key interface{}, value interface{})

Publish is equivalent of PublishContext(nil, key, value)

func (*Notifier) PublishContext added in v1.7.0

func (n *Notifier) PublishContext(ctx context.Context, key interface{}, value interface{})

PublishContext will send value to the targets of all active subscribers for a given key for which value is assignable, blocking until ctx is canceled (if non-nil), or each relevant subscriber is either sent value or cancels it's context

func (*Notifier) Subscribe added in v1.7.0

func (n *Notifier) Subscribe(key interface{}, target interface{})

Subscribe is equivalent of SubscribeContext(nil, key, target)

func (*Notifier) SubscribeCancel added in v1.8.0

func (n *Notifier) SubscribeCancel(ctx context.Context, key interface{}, target interface{}) context.CancelFunc

SubscribeCancel wraps SubscribeContext and Unsubscribe as well as the initialisation of a sub context, for defer statements using the result as a one-liner, and is the most fool-proof way to implement a subscriber, at the cost of less direct management of resources (including some which are potentially unnecessary, as it uses a sub-context and the returned cancel obeys the contract of context.CancelFunc and does not perform Unsubscribe inline)

Example
defer func() func() {
	startGoroutines := runtime.NumGoroutine()
	return func() {
		time.Sleep(time.Millisecond * 200)
		endGoroutines := runtime.NumGoroutine()
		if endGoroutines <= startGoroutines {
			fmt.Println(`our resources were freed`)
		}
	}
}()()
var (
	nf          Notifier
	ping        = make(chan float64)
	pong        = make(chan float64)
	ctx, cancel = context.WithCancel(context.Background())
)
defer cancel()
defer nf.SubscribeCancel(nil, `ping`, ping)()
defer nf.SubscribeCancel(nil, `pong`, pong)()
go func() {
	// worker which will receive all values and respond with that value x2
	// but first... sleep, to demonstrate it's not racey
	time.Sleep(time.Millisecond * 100)
	for {
		select {
		case <-ctx.Done():
			fmt.Println(`worker exiting`)
			return
		case value := <-ping:
			nf.PublishContext(ctx, `pong`, value*2)
		}
	}
}()
fmt.Println(`PING 5 x 2 = ...`)
nf.PublishContext(ctx, `ping`, 5.0)
fmt.Println(`PONG`, <-pong)
fmt.Println(`PING -23 x 2 = ...`)
nf.PublishContext(ctx, `ping`, -23.0)
fmt.Println(`PONG`, <-pong)
Output:

PING 5 x 2 = ...
PONG 10
PING -23 x 2 = ...
PONG -46
worker exiting
our resources were freed

func (*Notifier) SubscribeContext added in v1.7.0

func (n *Notifier) SubscribeContext(ctx context.Context, key interface{}, target interface{})

SubscribeContext registers a given target channel as a subscriber for a given key, which will block any attempts to publish to the key unless it is received from appropriately, or until context cancel (if a non-nil context was provided), be sure to unsubscribe exactly once to free references to ctx and target. A panic will occur if target is not a channel to which the notifier can send, or if there already exists a subscription for the given key and target combination. The key may be any comparable value.

func (*Notifier) Unsubscribe added in v1.7.0

func (n *Notifier) Unsubscribe(key interface{}, target interface{})

Unsubscribe deregisters a given key and target from the notifier, an action that may be performed exactly once after each subscription (for the combination of key and target), preventing further messages from being published to the target, and allowing freeing of associated resources WARNING subscribe context should always be canceled before calling this, or it may deadlock (especially under load)

type Producer

type Producer interface {
	io.Closer

	// Done should return a channel that will be closed after internal resources have been freed, after a `Close`
	// call, which may not be explicit.  This *may* mean that it blocks on any pending changes, and it *may* also
	// be possible that the consumer will be closed due to external reasons, e.g. connection closing.
	Done() <-chan struct{}

	// Put will send the provided values in-order to the message buffer, or return an error.
	// It MUST NOT block in such a way that it will be possible to cause a deadlock locally.
	Put(ctx context.Context, values ...interface{}) error
}

Producer models a producer in a producer-consumer pattern, where the resource will be closed at most once.

type Workers added in v1.4.0

type Workers struct {
	// contains filtered or unexported fields
}

Workers represents a dynamically resizable pool of workers, for when you want to have up to x number of operations happening at any given point in time, it can work directly with the Exclusive implementation.

func (*Workers) Call added in v1.4.0

func (w *Workers) Call(count int, value func() (interface{}, error)) (interface{}, error)

Call will call value synchronously, with up to count concurrency (with other concurrent calls), note that it will panic if the receiver is nil, the count is <= 0, or the value is nil.

func (*Workers) Count added in v1.5.0

func (w *Workers) Count() int

Count will return the number of workers currently running

func (*Workers) Wait added in v1.5.0

func (w *Workers) Wait()

Wait will unblock when all workers are complete

func (*Workers) Wrap added in v1.4.0

func (w *Workers) Wrap(count int, value func() (interface{}, error)) func() (interface{}, error)

Wrap encapsulates the provided value as a worker call, note that it will panic if the receiver is nil, the count is <= 0, or the value is nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL