Documentation ¶
Overview ¶
Package timeq is a file-based priority queue in Go.
Index ¶
- Constants
- Variables
- type BucketSplitConf
- type Consumer
- type ErrorMode
- type Fork
- type ForkName
- type Item
- type Items
- type Key
- type Logger
- type Options
- type Queue
- func (q *Queue) Clear() error
- func (q *Queue) Close() error
- func (q *Queue) Delete(from, to Key) (int, error)
- func (q *Queue) Fork(name ForkName) (*Fork, error)
- func (q *Queue) Forks() []ForkName
- func (q *Queue) Len() int
- func (q *Queue) Push(items Items) error
- func (q *Queue) Read(n int, fn TransactionFn) error
- func (q *Queue) Shovel(dst *Queue) (int, error)
- func (q *Queue) Sync() error
- type ReadOp
- type SyncMode
- type Transaction
- type TransactionFn
Examples ¶
Constants ¶
const ( // ReadOpPeek preserves the read data. It will be available on the next call to Read(). ReadOpPeek = 0 // ReadOpPop removes the read data. It will not be available on the next call to Read(). ReadOpPop = 1 )
const ( // SyncNone does not sync on normal operation (only on close) SyncNone = SyncMode(0) // SyncData only synchronizes the data log SyncData = SyncMode(1 << iota) // SyncIndex only synchronizes the index log (does not make sense alone) SyncIndex // SyncFull syncs both the data and index log SyncFull = SyncData | SyncIndex )
The available option are inspired by SQLite: https://www.sqlite.org/pragma.html#pragma_synchronous
const ( // ErrorModeAbort will immediately abort the current // operation if an error is encountered that might lead to data loss. ErrorModeAbort = ErrorMode(iota) // ErrorModeContinue tries to progress further in case of errors // by jumping over a faulty bucket or entry in a // If the error was recoverable, none is returned, but the // Logger in the Options will be called (if set) to log the error. ErrorModeContinue )
Variables ¶
var DefaultBucketSplitConf = ShiftBucketSplitConf(37)
DefaultBucketSplitConf assumes that `key` is a nanosecond unix timestamps and divides data (roughly) in 2m minute buckets.
var ( // ErrChangedSplitFunc is returned when the configured split func // in options does not fit to the state on disk. ErrChangedSplitFunc = errors.New("split func changed") )
var (
ErrNoSuchFork = errors.New("no fork with this name")
)
Functions ¶
This section is empty.
Types ¶
type BucketSplitConf ¶ added in v0.9.0
type BucketSplitConf struct { // Func is the function that does the splitting. Func func(item.Key) item.Key // Name is used as identifier to figure out // when the disk split func changed. Name string }
BucketSplitConf defines what keys are sorted in what bucket. See Options.BucketSplitConf for more info.
func FixedSizeBucketSplitConf ¶ added in v0.9.0
func FixedSizeBucketSplitConf(n uint64) BucketSplitConf
FixedSizeBucketSplitConf returns a BucketSplitConf that divides buckets into equal sized buckets with `n` entries. This can also be used to create time-based keys, if you use nanosecond based keys and pass time.Minute to create a buckets with a size of one minute.
func ShiftBucketSplitConf ¶ added in v0.9.0
func ShiftBucketSplitConf(shift int) BucketSplitConf
ShiftBucketSplitConf creates a fast BucketSplitConf that divides data into buckets by masking `shift` less significant bits of the key. With a shift of 37 you roughly get 2m buckets (if your key input are nanosecond-timestamps). If you want to calculate the size of a shift, use this formula: (2 ** shift) / (1e9 / 60) = minutes
type Consumer ¶ added in v0.9.0
type Consumer interface { Read(n int, fn TransactionFn) error Delete(from, to Key) (int, error) Shovel(dst *Queue) (int, error) Len() int Fork(name ForkName) (*Fork, error) }
Consumer is an interface that both Fork and Queue implement. It covers every consumer related API. Please refer to the respective Queue methods for details.
type Fork ¶ added in v0.9.0
type Fork struct {
// contains filtered or unexported fields
}
Fork is an implementation of the Consumer interface for a named fork. See the Fork() method for more explanation.
func (*Fork) Fork ¶ added in v0.9.0
Fork is like Queue.Fork(), except that the fork happens relative to the current state of the consumer and not to the state of the underlying Queue.
func (*Fork) Read ¶ added in v0.9.0
func (f *Fork) Read(n int, fn TransactionFn) error
Read is like Queue.Read().
type Items ¶
Items is a list of items.
func PeekCopy ¶ added in v0.9.0
PeekCopy works like a simplified Read() but copies the items and does not remove them. It is less efficient and should not be used if you care for performance.
func PopCopy ¶ added in v0.9.0
PopCopy works like a simplified Read() but copies the items and pops them. It is less efficient and should not be used if you care for performance.
Example ¶
// Error handling stripped for brevity: dir, _ := os.MkdirTemp("", "timeq-example") defer os.RemoveAll(dir) // Open the queue. If it does not exist, it gets created: queue, _ := Open(dir, DefaultOptions()) items := make(Items, 0, 5) for idx := 0; idx < 10; idx++ { items = append(items, Item{ Key: Key(idx), Blob: []byte(fmt.Sprintf("%d", idx)), }) } _ = queue.Push(items) got, _ := PopCopy(queue, 5) for _, item := range got { fmt.Println(item.Key) }
Output: K00000000000000000000 K00000000000000000001 K00000000000000000002 K00000000000000000003 K00000000000000000004
type Logger ¶ added in v0.9.0
Logger is a small interface to redirect logs to. The default logger outputs to stderr.
func DefaultLogger ¶ added in v0.9.0
func DefaultLogger() Logger
DefaultLogger produces a logger that writes to stderr.
func NullLogger ¶ added in v0.9.0
func NullLogger() Logger
NullLogger produces a logger that discards all messages.
func WriterLogger ¶ added in v0.9.0
type Options ¶
type Options struct { // SyncMode controls how often we sync data to the disk. The more data we sync // the more durable is the queue at the cost of throughput. // Default is the safe SyncFull. Think twice before lowering this. SyncMode SyncMode // Logger is used to output some non-critical warnigns or errors that could // have been recovered. By default we print to stderr. // Only warnings or errors are logged, no debug or informal messages. Logger Logger // ErrorMode defines how non-critical errors are handled. // See the individual enum values for more info. ErrorMode ErrorMode // BucketSplitConf defines what key goes to what bucket. // The provided function should clamp the key value to // a common value. Each same value that was returned goes // into the same The returned value should be also // the minimum key of the // // Example: '(key / 10) * 10' would produce buckets with 10 items. // // What bucket size to choose? Please refer to the FAQ in the README. // // NOTE: This may not be changed after you opened a queue with it! // Only way to change is to create a new queue and shovel the // old data into it. BucketSplitConf BucketSplitConf // MaxParallelOpenBuckets limits the number of buckets that can be opened // in parallel. Normally, operations like Push() will create more and more // buckets with time and old buckets do not get closed automatically, as // we don't know when they get accessed again. If there are more buckets // open than this number they get closed and will be re-opened if accessed // again. If this happens frequently, this comes with a performance penalty. // If you tend to access your data with rather random keys, you might want // to increase this number, depending on how much resources you have. // // If this number is <= 0, then this feature is disabled, which is not // recommended. MaxParallelOpenBuckets int }
Options gives you some knobs to configure the queue. Read the individual options carefully, as some of them can only be set on the first call to Open()
func DefaultOptions ¶
func DefaultOptions() Options
DefaultOptions give you a set of options that are good to enough to try some experiments. Your mileage can vary a lot with different settings, so make sure to do some benchmarking!
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is the high level API to the priority queue.
Example ¶
// Error handling stripped for brevity: dir, _ := os.MkdirTemp("", "timeq-example") defer os.RemoveAll(dir) // Open the queue. If it does not exist, it gets created: queue, _ := Open(dir, DefaultOptions()) // Push some items to it: pushItems := make(Items, 0, 10) for idx := 0; idx < 10; idx++ { pushItems = append(pushItems, Item{ Key: Key(idx), Blob: []byte(fmt.Sprintf("key_%d", idx)), }) } _ = queue.Push(pushItems) // Retrieve the same items again: _ = queue.Read(10, func(_ Transaction, popItems Items) (ReadOp, error) { // Just for example purposes, check if they match: if reflect.DeepEqual(pushItems, popItems) { fmt.Println("They match! :)") } else { fmt.Println("They do not match! :(") } return ReadOpPop, nil })
Output: They match! :)
func Open ¶
Open tries to open the priority queue structure in `dir`. If `dir` does not exist, then a new, empty priority queue is created. The behavior of the queue can be fine-tuned with `opts`.
func (*Queue) Close ¶
Close should always be called and error checked when you're done with using the queue. Close might still flush out some data, depending on what sync mode you configured.
func (*Queue) Delete ¶ added in v0.9.0
Delete deletes all items in the range `from` to `to`. Both `from` and `to` are including, i.e. keys with this value are deleted. The number of deleted items is returned.
func (*Queue) Fork ¶ added in v0.9.0
Fork splits the reading end of the queue in two parts. If Pop() is called on the returned Fork (which implements the Consumer interface), then other forks and the original queue is not affected.
The process of forking is relatively cheap and adds only minor storage and memory cost to the queue as a whole. Performance during pushing and popping is almost not affected at all.
Example ¶
// Error handling stripped for brevity: dir, _ := os.MkdirTemp("", "timeq-example") defer os.RemoveAll(dir) // Open the queue. If it does not exist, it gets created: queue, _ := Open(dir, DefaultOptions()) // For the consuming end in half: fork, _ := queue.Fork("fork") // Push some items to it - they are added to both the regular queue // as well to the fork we just created. _ = queue.Push(Items{ Item{ Key: 123, Blob: []byte("some data"), }, }) // Check the main queue contents: _ = queue.Read(1, func(_ Transaction, items Items) (ReadOp, error) { fmt.Println(string(items[0].Blob)) return ReadOpPop, nil }) // The same data should be available in the fork, // as it was not popped by the read above. _ = fork.Read(1, func(_ Transaction, items Items) (ReadOp, error) { fmt.Println(string(items[0].Blob)) return ReadOpPop, nil })
Output: some data some data
func (*Queue) Forks ¶ added in v0.9.0
Forks returns a list of fork names. The list will be empty if there are no forks yet. In other words: The initial queue is not counted as fork.
func (*Queue) Len ¶
Len returns the number of items in the queue. NOTE: This gets more expensive when you have a higher number of buckets, so you probably should not call that in a hot loop.
func (*Queue) Push ¶
Push pushes a batch of `items` to the queue. It is allowed to call this function during the read callback.
func (*Queue) Read ¶ added in v0.9.0
func (q *Queue) Read(n int, fn TransactionFn) error
Read fetches up to `n` items from the queue. It will call the supplied `fn` one or several times until either `n` is reached or the queue is empty. If the queue is empty before calling Read(), then `fn` is not called. If `n` is negative, then as many items as possible are returned until the queue is empty.
The `dst` argument can be used to pass a preallocated slice that the queue appends to. This can be done to avoid allocations. If you don't care you can also simply pass nil.
You should NEVER use the supplied items outside of `fn`, as they are directly sliced from a mmap(2). Accessing them outside will almost certainly lead to a crash. If you need them outside (e.g. for appending to a slice) then you can use the Copy() function of Items.
You can return either ReadOpPop or ReadOpPeek from `fn`.
You may only call Push() inside the read transaction. All other operations will DEADLOCK if called!
func (*Queue) Shovel ¶ added in v0.9.0
Shovel moves items from `src` to `dst`. The `src` queue will be completely drained afterwards. For speed reasons this assume that the dst queue uses the same bucket func as the source queue. If you cannot guarantee this, you should implement a naive Shovel() implementation that just uses Pop/Push.
This method can be used if you want to change options like the BucketSplitConf or if you intend to have more than one queue that are connected by some logic. Examples for the latter case would be a "deadletter queue" where you put failed calculations for later re-calculations or a queue for unacknowledged items.
type ReadOp ¶ added in v0.9.0
type ReadOp int
ReadOp defines what timeq should do with the data that was read.
type Transaction ¶ added in v0.9.0
Transaction is a handle to the queue during the read callback. See TransactionFn for more details.
Example ¶
// Error handling stripped for brevity: dir, _ := os.MkdirTemp("", "timeq-example") defer os.RemoveAll(dir) // Open the queue. If it does not exist, it gets created: queue, _ := Open(dir, DefaultOptions()) _ = queue.Push(Items{ Item{ Key: 123, Blob: []byte("some data"), }, Item{ Key: 456, Blob: []byte("other data"), }, }) _ = queue.Read(1, func(tx Transaction, items Items) (ReadOp, error) { // Push half of the data back to the queue. // You can use that to "unread" parts of what you read. return ReadOpPop, tx.Push(items[1:]) }) fmt.Println(queue.Len())
Output: 1
type TransactionFn ¶ added in v0.9.0
type TransactionFn func(tx Transaction, items Items) (ReadOp, error)
TransactionFn is the function passed to the Read() call. It will be called zero to multiple times with a number of items that was read. You can decide with the return value what to do with this data. Returning an error will immediately stop further reading. The current data will not be touched and the error is bubbled up.
The `tx` parameter can be used to push data back to the queue. It might be extended in future releases.