Documentation ¶
Overview ¶
Package bigbuff implements a one-many ordered queue producer-consumer pattern + utilities.
Index ¶
- Constants
- func CombineContext(ctx context.Context, others ...context.Context) context.Context
- func DefaultCleaner(size int, offsets []int) int
- func Range(ctx context.Context, consumer Consumer, ...) error
- func WaitCond(ctx context.Context, cond *sync.Cond, fn func() bool) error
- type Buffer
- func (b *Buffer) CleanerConfig() CleanerConfig
- func (b *Buffer) Close() (err error)
- func (b *Buffer) Diff(c Consumer) (int, bool)
- func (b *Buffer) Done() <-chan struct{}
- func (b *Buffer) NewConsumer() (Consumer, error)
- func (b *Buffer) Put(ctx context.Context, values ...interface{}) error
- func (b *Buffer) Range(ctx context.Context, c Consumer, fn func(index int, value interface{}) bool) error
- func (b *Buffer) SetCleanerConfig(config CleanerConfig) error
- func (b *Buffer) Size() int
- func (b *Buffer) Slice() []interface{}
- type Channel
- type Cleaner
- type CleanerConfig
- type Consumer
- type Exclusive
- func (e *Exclusive) Call(key interface{}, value func() (interface{}, error)) (interface{}, error)
- func (e *Exclusive) CallAfter(key interface{}, value func() (interface{}, error), wait time.Duration) (interface{}, error)
- func (e *Exclusive) CallAfterAsync(key interface{}, value func() (interface{}, error), wait time.Duration) <-chan *ExclusiveOutcome
- func (e *Exclusive) CallAsync(key interface{}, value func() (interface{}, error)) <-chan *ExclusiveOutcome
- type ExclusiveOutcome
- type FixedBufferCleanerNotification
- type Producer
- type Workers
Constants ¶
const ( // DefaultCleanerCooldown is how long the cleaner will wait between checks of the Buffer by default. DefaultCleanerCooldown = time.Millisecond * 10 // DefaultChannelPollRate is how frequently each waiting Channel.Get should try to receive from the channel (from // the first failure/non-receive). DefaultChannelPollRate = time.Millisecond )
Variables ¶
This section is empty.
Functions ¶
func CombineContext ¶
CombineContext returns a context based on the ctx (first param), that will cancel when ANY of the other provided context values cancel CAUTION this spawns one or more blocking goroutines, if you call this with contexts that don't cancel in the reasonable lifetime of your application you will have a leak
func DefaultCleaner ¶
DefaultCleaner is the Buffer's default cleaner, if there is at least one "active" consumer it returns the lowest offset, defaulting to 0, effectively removing values from the buffer that all consumers have read, note the return value is limited to >= 0 and <= size, active consumers are defined as those registered with offsets >= 0.
func Range ¶
func Range(ctx context.Context, consumer Consumer, fn func(index int, value interface{}) bool) error
Range iterates over the consumer, encapsulating automatic commits and rollbacks, including rollbacks caused by panics, note that the index will be the index in THIS range, starting at 0, and incrementing by one with each call to fn. NOTE: the ctx value will be passed into the consumer.Get as-is.
func WaitCond ¶
WaitCond performs a conditional wait against a *sync.Cond, waiting until fn returns true, with a inbuilt escape hatch for context cancel. Note that the relevant locker must be locked before this is called. It should also be noted that cond.L.Lock will before a context triggered broadcast, in order to avoid a race condition (i.e. if context is cancelled while fn is being evaluated).
Types ¶
type Buffer ¶
type Buffer struct {
// contains filtered or unexported fields
}
Buffer is the core implementation, implementing the Producer interface, and providing auxiliary methods for configuration, as well as `NewConsumer`, to instance a new consumer, note that though it is safe to instance via new(bigbuff.Buffer), it must not be copied after first use. It's behavior regarding message retention may be configured via SetCleanerConfig, by default it will un-buffer only messages that have been read by all "active and valid" consumers, given at least one exists, otherwise it will retain messages indefinitely. NOTE: the buffer itself will not be cleared even after close, so the data can still be accessed.
func (*Buffer) CleanerConfig ¶
func (b *Buffer) CleanerConfig() CleanerConfig
CleanerConfig returns the current cleaner config (which has defaults)
func (*Buffer) Diff ¶ added in v1.1.0
Diff is provided to facilitate ranging over a buffer via a consumer, and returns the items remaining in the buffer (includes uncommitted), be aware that the value CAN be negative, in the event the consumer fell behind (the cleaner cleared item(s) from the buffer that the consumer hadn't read yet, which by default will never happen, as the default mode is a unbounded buffer). Note it will return (0, false) for any invalid consumers or any not registered on the receiver buffer.
func (*Buffer) NewConsumer ¶
NewConsumer constructs a new consumer instance.
func (*Buffer) Range ¶ added in v1.1.0
func (b *Buffer) Range(ctx context.Context, c Consumer, fn func(index int, value interface{}) bool) error
Range provides a way to iterate from the start to the end of the buffer, note that it will exit as soon as it reaches the end of the buffer (unlike ranging on a channel), it simply utilizes the package Range + Buffer.Diff.
func (*Buffer) SetCleanerConfig ¶
func (b *Buffer) SetCleanerConfig(config CleanerConfig) error
SetCleanerConfig updates the cleaner config, returning an error if the config was invalid.
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel implements Consumer based on data from a channel, note that because it uses reflection and polling internally, it is actually safe to close the input channel without invalid zero value reads (though the Channel itself still needs to be closed, and any Get calls will still be blocked until then).
func NewChannel ¶
NewChannel constructs a new consumer that implements Consumer, but receives it's data from a channel, which uses reflection to support any readable channel, note that a poll date of zero will use the default, and < 0 is an error.
func (*Channel) Buffer ¶
func (c *Channel) Buffer() []interface{}
Buffer returns any values that were drained from the source but not committed yet, in a new copy of the internal buffer, note that if you are trying to ensure no messages get lost in the void, block until Channel.Done before calling this.
type Cleaner ¶
Cleaner is a callback used to manage the size of a bigbuff.Buffer instance, it will be called when relevant to do so, with the size of the buffer, and the consumer offsets (relative to the buffer), and should return the number of elements from the buffer that should be attempted to be shifted from the buffer.
func FixedBufferCleaner ¶
func FixedBufferCleaner( max int, target int, callback func(notification FixedBufferCleanerNotification), ) Cleaner
FixedBufferCleaner builds a cleaner that will give a buffer a fixed threshold size, which will trigger forced reduction back to a fixed target size, note that if callback is supplied it will be called with the details of the cleanup, in the event that it forces cleanup past the default. This has the effect of causing any consumers that were running behind the target size (in terms of their read position in the buffer) to fail on any further Get calls.
type CleanerConfig ¶
type CleanerConfig struct { // Cleaner is used to determine if items are removed from the buffer Cleaner Cleaner // Cooldown is the minimum time between cleanup cycles Cooldown time.Duration }
CleanerConfig is a configuration for a bigbuff.Buffer, that defines how the size is managed
type Consumer ¶
type Consumer interface { io.Closer // Done should return a channel that will be closed after internal resources have been freed, after a `Close` // call, which may not be explicit. This *may* mean that it blocks on any pending changes, and it *may* also // be possible that the consumer will be closed due to external reasons, e.g. connection closing. Done() <-chan struct{} // Get will get a message from the message buffer, at the current offset, blocking if none are available, or // an error if it fails. Get(ctx context.Context) (interface{}, error) // Commit will save any offset changes, and will return an error if it fails, or if the offset saved is the // latest. Commit() error // Rollback will undo any offset changes, and will return an error if it fails, or if the offset saved is the // latest. Rollback() error }
Consumer models a consumer in a producer-consumer pattern, where the resource will be closed at most once.
type Exclusive ¶ added in v1.2.1
type Exclusive struct {
// contains filtered or unexported fields
}
Exclusive provides synchronous de-bouncing of operations that may also return a result or error, with consistent or controlled input via provided closures also supported, and the use of any comparable keys to match on, it provides a guarantee that the actual call that returns a given value will be started AFTER the Call method, so keep that in mind when implementing something using it. You may also use the CallAfter method to delay execution after initialising the key, e.g. to allow the first of many costly operations on a given key a grace period to be grouped with the remaining keys.
func (*Exclusive) Call ¶ added in v1.2.1
Call uses a given key to ensure that the operation that the value callback represents will not be performed concurrently, and in the event that one or more operations are attempted while a given operation is still being performed, these operations will be grouped such that they are debounced to a single call, sharing the output.
Note that this method will panic if the receiver is nil, or the value is nil, but a nil key is allowed.
func (*Exclusive) CallAfter ¶ added in v1.2.1
func (e *Exclusive) CallAfter(key interface{}, value func() (interface{}, error), wait time.Duration) (interface{}, error)
CallAfter performs exactly the same operation as the Exclusive.Call method, but with an added wait to allow operations sent through in close succession to be grouped together, note that if wait is <= 0 it will be ignored.
func (*Exclusive) CallAfterAsync ¶ added in v1.3.0
func (e *Exclusive) CallAfterAsync(key interface{}, value func() (interface{}, error), wait time.Duration) <-chan *ExclusiveOutcome
CallAfterAsync behaves exactly the same as CallAfter but guarantees order (the value func) for synchronous calls.
Note that the return value will always be closed after being sent the result, and will therefore any additional reads will always receive nil.
func (*Exclusive) CallAsync ¶ added in v1.3.0
func (e *Exclusive) CallAsync(key interface{}, value func() (interface{}, error)) <-chan *ExclusiveOutcome
CallAsync behaves exactly the same as Call but guarantees order (the value func) for synchronous calls.
Note that the return value will always be closed after being sent the result, and will therefore any additional reads will always receive nil.
type ExclusiveOutcome ¶ added in v1.3.0
type ExclusiveOutcome struct { Result interface{} Error error }
ExclusiveOutcome is the return value from an async bigbuff.Exclusive call
type FixedBufferCleanerNotification ¶
type FixedBufferCleanerNotification struct { Max int // Max size before forced cleanup is triggered. Target int // Target size when force cleanup is triggered. Size int // Size when cleanup was triggered. Offsets []int // Offsets when cleanup was triggered. Trim int // Trim number returned. }
FixedBufferCleanerNotification is the context provided to the optional callback provided to the FixedBufferCleaner function.
type Producer ¶
type Producer interface { io.Closer // Done should return a channel that will be closed after internal resources have been freed, after a `Close` // call, which may not be explicit. This *may* mean that it blocks on any pending changes, and it *may* also // be possible that the consumer will be closed due to external reasons, e.g. connection closing. Done() <-chan struct{} // Put will send the provided values in-order to the message buffer, or return an error. // It MUST NOT block in such a way that it will be possible to cause a deadlock locally. Put(ctx context.Context, values ...interface{}) error }
Producer models a producer in a producer-consumer pattern, where the resource will be closed at most once.
type Workers ¶ added in v1.4.0
type Workers struct {
// contains filtered or unexported fields
}
Workers represents a dynamically resizable pool of workers, for when you want to have up to x number of operations happening at any given point in time, it can work directly with the Exclusive implementation.
func (*Workers) Call ¶ added in v1.4.0
Call will call value synchronously, with up to count concurrency (with other concurrent calls), note that it will panic if the receiver is nil, the count is <= 0, or the value is nil.