bigbuff

package module
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2018 License: Apache-2.0 Imports: 7 Imported by: 1

README

go-bigbuff

test coverage: 93.7%

Package bigbuff provides an in-memory Buffer implementation, which is designed to support a Producer-Consumer (one to many) pattern, where each consumer reads the full data set, and has their own offsets, with atomic commits and rollbacks. It also provides a well designed native channel backed Consumer implementation, that provides the same kind of feature set, but with is geared towards fan-in implementations, with the same benefits on the consumer end as provided by the Buffer implementation, also solving channel draining and "zero reads" on producer channel close (at the cost of some performance vs pure channels, of course). There are also some utility helper methods which are exposed as part of this package.

Godoc here: github.com/joeycumines/go-bigbuff

Features:

  • Queue implementations with strong support for context.Context, aiming to guarantee as much data consistency as possible
  • Well defined interfaces designed after the style of core golang libs
  • Handy Range method for iterating, with catch / throw pattern for panics
  • Exposes async util functions to combine contexts and safely perform a conditional wait via sync.Cond (no leaks)
  • Configurable cleanup behavior - control retention based on the length and consumer offsets + the min cycle wait
  • The whole reason why I implemented this in the first place, a producer implementation that is 100% safe to use within a consumer processing loop (unbounded buffer size, e.g. imagine something ranging from a buffered channel but also sending to it, but it's 1 consumer, so as soon as the buffer fills, it will deadlock if it tries a send, and another consumer would break guaranteed in-order processing)
  • Single-channel implementation of the bigbuff.Consumer interface, that is uses reflection + polling to provide the same commit and rollback functionality (retains order, supports any readable channel).
  • Breaking from the mould a little bigbuff.Exclusive allows key based locking and de-bouncing for operations that may return values like (interface{}, error), operating synchronously to keep things simple
  • Useful for limiting concurrent executions bigbuff.Workers is compatible with bigbuff.Exclusive

Documentation

Overview

Package bigbuff implements a one-many ordered queue producer-consumer pattern + utilities.

Index

Constants

View Source
const (
	// DefaultCleanerCooldown is how long the cleaner will wait between checks of the Buffer by default.
	DefaultCleanerCooldown = time.Millisecond * 10

	// DefaultChannelPollRate is how frequently each waiting Channel.Get should try to receive from the channel (from
	// the first failure/non-receive).
	DefaultChannelPollRate = time.Millisecond
)

Variables

This section is empty.

Functions

func CombineContext

func CombineContext(ctx context.Context, others ...context.Context) context.Context

CombineContext returns a context based on the ctx (first param), that will cancel when ANY of the other provided context values cancel CAUTION this spawns one or more blocking goroutines, if you call this with contexts that don't cancel in the reasonable lifetime of your application you will have a leak

func DefaultCleaner

func DefaultCleaner(size int, offsets []int) int

DefaultCleaner is the Buffer's default cleaner, if there is at least one "active" consumer it returns the lowest offset, defaulting to 0, effectively removing values from the buffer that all consumers have read, note the return value is limited to >= 0 and <= size, active consumers are defined as those registered with offsets >= 0.

func Range

func Range(ctx context.Context, consumer Consumer, fn func(index int, value interface{}) bool) error

Range iterates over the consumer, encapsulating automatic commits and rollbacks, including rollbacks caused by panics, note that the index will be the index in THIS range, starting at 0, and incrementing by one with each call to fn. NOTE: the ctx value will be passed into the consumer.Get as-is.

func WaitCond

func WaitCond(ctx context.Context, cond *sync.Cond, fn func() bool) error

WaitCond performs a conditional wait against a *sync.Cond, waiting until fn returns true, with a inbuilt escape hatch for context cancel. Note that the relevant locker must be locked before this is called. It should also be noted that cond.L.Lock will before a context triggered broadcast, in order to avoid a race condition (i.e. if context is cancelled while fn is being evaluated).

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer is the core implementation, implementing the Producer interface, and providing auxiliary methods for configuration, as well as `NewConsumer`, to instance a new consumer, note that though it is safe to instance via new(bigbuff.Buffer), it must not be copied after first use. It's behavior regarding message retention may be configured via SetCleanerConfig, by default it will un-buffer only messages that have been read by all "active and valid" consumers, given at least one exists, otherwise it will retain messages indefinitely. NOTE: the buffer itself will not be cleared even after close, so the data can still be accessed.

func (*Buffer) CleanerConfig

func (b *Buffer) CleanerConfig() CleanerConfig

CleanerConfig returns the current cleaner config (which has defaults)

func (*Buffer) Close

func (b *Buffer) Close() (err error)

func (*Buffer) Diff added in v1.1.0

func (b *Buffer) Diff(c Consumer) (int, bool)

Diff is provided to facilitate ranging over a buffer via a consumer, and returns the items remaining in the buffer (includes uncommitted), be aware that the value CAN be negative, in the event the consumer fell behind (the cleaner cleared item(s) from the buffer that the consumer hadn't read yet, which by default will never happen, as the default mode is a unbounded buffer). Note it will return (0, false) for any invalid consumers or any not registered on the receiver buffer.

func (*Buffer) Done

func (b *Buffer) Done() <-chan struct{}

func (*Buffer) NewConsumer

func (b *Buffer) NewConsumer() (Consumer, error)

NewConsumer constructs a new consumer instance.

func (*Buffer) Put

func (b *Buffer) Put(ctx context.Context, values ...interface{}) error

func (*Buffer) Range added in v1.1.0

func (b *Buffer) Range(ctx context.Context, c Consumer, fn func(index int, value interface{}) bool) error

Range provides a way to iterate from the start to the end of the buffer, note that it will exit as soon as it reaches the end of the buffer (unlike ranging on a channel), it simply utilizes the package Range + Buffer.Diff.

func (*Buffer) SetCleanerConfig

func (b *Buffer) SetCleanerConfig(config CleanerConfig) error

SetCleanerConfig updates the cleaner config, returning an error if the config was invalid.

func (*Buffer) Size

func (b *Buffer) Size() int

Size returns the length of the buffer

func (*Buffer) Slice

func (b *Buffer) Slice() []interface{}

Slice returns a copy of the internal message buffer (all currently stored in memory). NOTE: this will read-lock the buffer itself, and is accessible even after the buffer is closed.

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel implements Consumer based on data from a channel, note that because it uses reflection and polling internally, it is actually safe to close the input channel without invalid zero value reads (though the Channel itself still needs to be closed, and any Get calls will still be blocked until then).

func NewChannel

func NewChannel(ctx context.Context, pollRate time.Duration, source interface{}) (*Channel, error)

NewChannel constructs a new consumer that implements Consumer, but receives it's data from a channel, which uses reflection to support any readable channel, note that a poll date of zero will use the default, and < 0 is an error.

func (*Channel) Buffer

func (c *Channel) Buffer() []interface{}

Buffer returns any values that were drained from the source but not committed yet, in a new copy of the internal buffer, note that if you are trying to ensure no messages get lost in the void, block until Channel.Done before calling this.

func (*Channel) Close

func (c *Channel) Close() error

Close closes the consumer NOTE that it doesn't close the source channel.

func (*Channel) Commit

func (c *Channel) Commit() error

Commit resets read parts of the buffer, or returns an error, note it will always error after context cancel.

func (*Channel) Done

func (c *Channel) Done() <-chan struct{}

func (*Channel) Get

func (c *Channel) Get(ctx context.Context) (value interface{}, err error)

func (*Channel) Rollback

func (c *Channel) Rollback() error

Rollback will cause following Get calls to read from the start of the buffer, or it will return an error if there is nothing to rollback (there is nothing pending).

type Cleaner

type Cleaner func(size int, offsets []int) int

Cleaner is a callback used to manage the size of a bigbuff.Buffer instance, it will be called when relevant to do so, with the size of the buffer, and the consumer offsets (relative to the buffer), and should return the number of elements from the buffer that should be attempted to be shifted from the buffer.

func FixedBufferCleaner

func FixedBufferCleaner(
	max int,
	target int,
	callback func(notification FixedBufferCleanerNotification),
) Cleaner

FixedBufferCleaner builds a cleaner that will give a buffer a fixed threshold size, which will trigger forced reduction back to a fixed target size, note that if callback is supplied it will be called with the details of the cleanup, in the event that it forces cleanup past the default. This has the effect of causing any consumers that were running behind the target size (in terms of their read position in the buffer) to fail on any further Get calls.

type CleanerConfig

type CleanerConfig struct {
	// Cleaner is used to determine if items are removed from the buffer
	Cleaner Cleaner

	// Cooldown is the minimum time between cleanup cycles
	Cooldown time.Duration
}

CleanerConfig is a configuration for a bigbuff.Buffer, that defines how the size is managed

type Consumer

type Consumer interface {
	io.Closer

	// Done should return a channel that will be closed after internal resources have been freed, after a `Close`
	// call, which may not be explicit. This *may* mean that it blocks on any pending changes, and it *may* also
	// be possible that the consumer will be closed due to external reasons, e.g. connection closing.
	Done() <-chan struct{}

	// Get will get a message from the message buffer, at the current offset, blocking if none are available, or
	// an error if it fails.
	Get(ctx context.Context) (interface{}, error)

	// Commit will save any offset changes, and will return an error if it fails, or if the offset saved is the
	// latest.
	Commit() error

	// Rollback will undo any offset changes, and will return an error if it fails, or if the offset saved is the
	// latest.
	Rollback() error
}

Consumer models a consumer in a producer-consumer pattern, where the resource will be closed at most once.

type Exclusive added in v1.2.1

type Exclusive struct {
	// contains filtered or unexported fields
}

Exclusive provides synchronous de-bouncing of operations that may also return a result or error, with consistent or controlled input via provided closures also supported, and the use of any comparable keys to match on, it provides a guarantee that the actual call that returns a given value will be started AFTER the Call method, so keep that in mind when implementing something using it. You may also use the CallAfter method to delay execution after initialising the key, e.g. to allow the first of many costly operations on a given key a grace period to be grouped with the remaining keys.

func (*Exclusive) Call added in v1.2.1

func (e *Exclusive) Call(key interface{}, value func() (interface{}, error)) (interface{}, error)

Call uses a given key to ensure that the operation that the value callback represents will not be performed concurrently, and in the event that one or more operations are attempted while a given operation is still being performed, these operations will be grouped such that they are debounced to a single call, sharing the output.

Note that this method will panic if the receiver is nil, or the value is nil, but a nil key is allowed.

func (*Exclusive) CallAfter added in v1.2.1

func (e *Exclusive) CallAfter(key interface{}, value func() (interface{}, error), wait time.Duration) (interface{}, error)

CallAfter performs exactly the same operation as the Exclusive.Call method, but with an added wait to allow operations sent through in close succession to be grouped together, note that if wait is <= 0 it will be ignored.

func (*Exclusive) CallAfterAsync added in v1.3.0

func (e *Exclusive) CallAfterAsync(key interface{}, value func() (interface{}, error), wait time.Duration) <-chan *ExclusiveOutcome

CallAfterAsync behaves exactly the same as CallAfter but guarantees order (the value func) for synchronous calls.

Note that the return value will always be closed after being sent the result, and will therefore any additional reads will always receive nil.

func (*Exclusive) CallAsync added in v1.3.0

func (e *Exclusive) CallAsync(key interface{}, value func() (interface{}, error)) <-chan *ExclusiveOutcome

CallAsync behaves exactly the same as Call but guarantees order (the value func) for synchronous calls.

Note that the return value will always be closed after being sent the result, and will therefore any additional reads will always receive nil.

type ExclusiveOutcome added in v1.3.0

type ExclusiveOutcome struct {
	Result interface{}
	Error  error
}

ExclusiveOutcome is the return value from an async bigbuff.Exclusive call

type FixedBufferCleanerNotification

type FixedBufferCleanerNotification struct {
	Max     int   // Max size before forced cleanup is triggered.
	Target  int   // Target size when force cleanup is triggered.
	Size    int   // Size when cleanup was triggered.
	Offsets []int // Offsets when cleanup was triggered.
	Trim    int   // Trim number returned.
}

FixedBufferCleanerNotification is the context provided to the optional callback provided to the FixedBufferCleaner function.

type Producer

type Producer interface {
	io.Closer

	// Done should return a channel that will be closed after internal resources have been freed, after a `Close`
	// call, which may not be explicit.  This *may* mean that it blocks on any pending changes, and it *may* also
	// be possible that the consumer will be closed due to external reasons, e.g. connection closing.
	Done() <-chan struct{}

	// Put will send the provided values in-order to the message buffer, or return an error.
	// It MUST NOT block in such a way that it will be possible to cause a deadlock locally.
	Put(ctx context.Context, values ...interface{}) error
}

Producer models a producer in a producer-consumer pattern, where the resource will be closed at most once.

type Workers added in v1.4.0

type Workers struct {
	// contains filtered or unexported fields
}

Workers represents a dynamically resizable pool of workers, for when you want to have up to x number of operations happening at any given point in time, it can work directly with the Exclusive implementation.

func (*Workers) Call added in v1.4.0

func (w *Workers) Call(count int, value func() (interface{}, error)) (interface{}, error)

Call will call value synchronously, with up to count concurrency (with other concurrent calls), note that it will panic if the receiver is nil, the count is <= 0, or the value is nil.

func (*Workers) Count added in v1.5.0

func (w *Workers) Count() int

Count will return the number of workers currently running

func (*Workers) Wait added in v1.5.0

func (w *Workers) Wait()

Wait will unblock when all workers are complete

func (*Workers) Wrap added in v1.4.0

func (w *Workers) Wrap(count int, value func() (interface{}, error)) func() (interface{}, error)

Wrap encapsulates the provided value as a worker call, note that it will panic if the receiver is nil, the count is <= 0, or the value is nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL