timeq

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2024 License: MIT Imports: 18 Imported by: 4

README

timeq

GoDoc Build status

A file-based priority queue in Go.

Generally speaking, timeq can be used to implement these and more:

  • A streaming platform like NATS or message brokers similar to Mosquitto.
  • A file-backend job queue with different priorities.
  • A telemetry pipeline for IoT devices to buffer offline data.
  • Wherever you would use a regular file-based queue.

Features

  • Clean and well test code base based on Go 1.22
  • High throughput thanks to batch processing and mmap()
  • Tiny memory footprint that does not depend on the number of items in the queue.
  • Simple interface with classic Push() and Read() and only few other functions.
  • Sane default settings, with some knobs that can be tuned for your use case.
  • Consuming end can be efficiently and easily forked into several consumers.

This implementation should be generally useful, despite the time in the name. However, the initial design had timestamps as priority keys in mind. For best performance the following assumptions were made:

  • Your OS supports mmap() and mremap() (i.e. Linux/FreeBSD)
  • Seeking in files during reading is cheap (i.e. no HDD)
  • The priority key ideally increases without much duplicates (like timestamps, see FAQ).
  • You push and pop your data in, ideally, big batches.
  • The underlying storage has a low risk for write errors or bit flips.
  • You trust your data to some random dude's code on the internet (don't we all?).

If some of those assumptions do not fit your use case and you still managed to make it work, I would be happy for some feedback or even pull requests to improve the general usability.

See the API documentation here for examples and the actual documentation.

Use cases

My primary use case was a embedded Linux device that has different services that generate a stream of data that needs to be send to the cloud. For this the data was required to be in ascending order (sorted by time) and also needed to be buffered with tight memory boundaries.

A previous attempt based onsqlite3 did work kinda well but was much slower than it had to be (partly also due to the heavy cost of cgo). This motivated me to write this queue implementation.

Usage

To download the library, just do this in your project:

# Use latest or a specific tag as you like
$ go get github.com/sahib/timeq@latest

We also ship a rudimentary command-line client that can be used for experiments. You can install it like this:

$ go install github.com/sahib/timeq/cmd@latest

Benchmarks

The included benchmark pushes 2000 items with a payload of 40 byte per operation.

$ make bench
goos: linux
goarch: amd64
pkg: github.com/sahib/timeq
cpu: 12th Gen Intel(R) Core(TM) i7-1270P
BenchmarkPopSyncNone-16      35924  33738 ns/op  240 B/op  5 allocs/op
BenchmarkPopSyncData-16      35286  33938 ns/op  240 B/op  5 allocs/op
BenchmarkPopSyncIndex-16     34030  34003 ns/op  240 B/op  5 allocs/op
BenchmarkPopSyncFull-16      35170  33592 ns/op  240 B/op  5 allocs/op
BenchmarkPushSyncNone-16     20336  56867 ns/op   72 B/op  2 allocs/op
BenchmarkPushSyncData-16     20630  58613 ns/op   72 B/op  2 allocs/op
BenchmarkPushSyncIndex-16    20684  58782 ns/op   72 B/op  2 allocs/op
BenchmarkPushSyncFull-16     19994  59491 ns/op   72 B/op  2 allocs/op

Multi Consumer

timeq supports a Fork() operation that splits the consuming end of a queue in two halves. You can then consume from each of the halves individually, without modifying the state of the other one. It's even possible to fork a fork again, resulting in a consumer hierarchy. This is probably best explained by this diagram:

  1. The initial state of the queue with 8 items in it,
  2. We fork the queue by calling Fork("foo").
  3. We consume 3 items from the fork via fork.Pop().
  4. Pushing new data will go to all existing forks.

This is implemented efficiently (see below) by just having duplicated indexes. It opens up some interesting use cases:

  • For load-balancing purposes you could have several workers consuming data from timeq, each Pop()'ing and working on different parts of the queue. Sometimes it would be nice to let workers work on the same set of data (e.g. when they all transform the data in different ways). The latter is easily possibly with forks.
  • Fork the queue and consume from it until some point as experiment and remove the fork afterwards. The original data is not affected by this.
  • Prevent data from data getting lost by keeping a "deadletter" fork that keeps track of whatever you want. This way you can implement something like a max-age of queue's items.

Design

  • All data is divided into buckets by a user-defined function (»BucketSplitConf«).
  • Each bucket is it's own priority queue, responsible for a part of the key space.
  • A push to a bucket writes the batch of data to a memory-mapped log file on disk. The location of the batch is stored in an in-memory index and to a index WAL.
  • On pop we select the bucket with the lowest key first and ask the index to give us the location of the lowest batch. Once done the index is updated to mark the items as popped. The data stays intact in the data log.
  • Once a bucket was completely drained it is removed from disk to retain space.

Since the index is quite small (only one entry per batch) we can easily fit it in memory. On the initial load all bucket indexes are loaded, but no memory is mapped yet.

Limits
  • Each item payload might be at most 64M.
  • Each bucket can be at most 2^63 bytes in size.
  • Using priority keys close to the integer limits is most certainly a bad idea.
  • When a bucket was created with a specific BucketSplitConf it cannot be changed later. timeq will error out in this case and the queue needs to be migrated. If this turns out as a practical issue we could implement an automated migration path.
Data Layout

The data is stored on disk in two files per bucket:

  • data.log: Stores a single entry of a batch.
  • idx.log: Stores the key and location of batches. Can be regenerated from dat.log.

This graphic shows one entry of each:

Data Layout

Each bucket lives in its own directory called K<key>. Example: If you have two buckets, your data looks like this on this:

/path/to/db/
├── split.conf
├── K00000000000000000001
│   ├── dat.log
│   ├── idx.log
│   └── forkx.idx.log
└── K00000000000000000002
    ├── dat.log
    ├── idx.log
    └── forkx.idx.log

The actual data is in dat.log. This is an append-only log that is memory-mapped by timeq. All files that end with idx.log are indexes, that point to the currently reachable parts of dat.log. Each entry in idx.log is a batch, so the log will only increase marginally if your batches are big enough. forkx.idx.log (and possibly more files like that) are index forks, which work the same way as idx.log, but track a different state of the respective bucket.

NOTE: Buckets get cleaned up on open or when completely empty (i.e. all forks are empty) during consumption. Do not expect that the disk usage automatically decreases whenever you pop something. It does decrease, but in batches.

Applied Optimizations
  • Data is pushed and popped as big batches and the index only tracks batches. This greatly lowers the memory usage, if you use big batches.
  • The API is very friendly towards re-using memory internally. Data is directly sliced from the memory map and given to the user in the read callback. Almost no allocations made during normal operation. If you need the data outside the callback, you have the option to copy it.
  • Division into small, manageable buckets. Only the buckets that are accessed are actually loaded.
  • Both dat.log and idx.log are append-only, requiring no random seeking for best performance.
  • dat.log is memory mapped and resized using mremap() in big batches. The bigger the log, the bigger the pre-allocation.
  • Sorting into buckets during Push() uses binary search for fast sorting.
  • Shovel() can move whole bucket directories, if possible.
  • In general, the concept of »Mechanical Sympathy« was applied to some extent to make the code cache friendly.

FAQ:

Can timeq be also used with non-time based keys?

There are no notable places where the key of an item is actually assumed to be timestamp, except for the default BucketSplitConf (which can be configured). If you find a good way to sort your data into buckets you should be good to go. Keep in mind that timestamps were the idea behind the original design, so your mileage may vary - always benchmark your individual usecase. You can modify one of the existing benchmarks to test your assumptions.

Why should I care about buckets?

Most importantly: Only buckets are loaded which are being in use. This allows a very small footprint, especially if the push input is already roughly sorted.

There are also some other reasons:

  • If one bucket becomes corrupt for some reason, you loose only the data in this bucket.
  • On Shovel() we can cheaply move buckets if they do not exist in the destination.
  • ...and some more optimizations.
How do I choose the right size of my buckets?

It depends on a few things. Answer the following questions in a worst case scenario:

  • How much memory do you have at hand?
  • How many items would you push to a single bucket?
  • How big is each item?
  • How many buckets should be open at the same time?

As timeq uses mmap(2) internally, only the pages that were accessed are actually mapped to physical memory. However when pushing a lot of data this is mapped to physical memory, as all accessed pages of a bucket stay open (which is good if you Pop immediately after). So you should be fine if this evaluates to true:

BytesPerItem * ItemsPerBucketInWorstCase * MaxOpenParallelBuckets < BytesMemoryAvailable - WiggleRoom.

You can lower the number of open buckets with MaxOpenParallelBuckets.

Keep in mind that timeq is fast and can be memory-efficient if used correctly, but it's not a magic device. In future I might introduce a feature that does not keep the full bucket mapped if it's only being pushed to. The return-on-invest for such an optimization would be rather small though.

Can I store more than one value per key?

Yes, no problem. The index may store more than one batch per key. There is a slight allocation overhead on Queue.Push() though. Since timeq was mostly optimized for mostly-unique keys (i.e. timestamps) you might see better performance with less duplicates. It should not be very significant though.

If you want to use priority keys that are in a very narrow range (thus many duplicates) then you can think about spreading the range a bit wider. For example: You have priority keys from zero to ten for the tasks in your job queue. Instead of using zero to ten as keys, you can add the job-id to the key and shift the priority: (prio << 32) | jobID.

How failsafe is timeq?

I use it on a big fleet of embedded devices in the field, so it's already quite a bit battle tested. Design wise, damaged index files can be regenerated from the data log. There's no error correction code applied in the data log and no checksums are currently written. If you need this, I'm happy if a PR comes in that enables it optionally.

For durability, the design is build to survive crashes without data loss (Push, Read) but, in some cases, it might result in duplicated data (Shovel). My recommendation is designing your application logic in a way that allows duplicate items to be handled gracefully.

This assumes a filesystem with full journaling (data=journal for ext4) or some other filesystem that gives your similar guarantees. We do properly call msync() and fsync() in the relevant cases. For now, crash safety was not yet tested a lot though. Help here is welcome.

The test suite is currently roughly as big as the codebase. The best protection against bugs is a small code base, so that's not too impressive yet. We're of course working on improving the testsuite, which is a never ending task. Additionally we have a bunch of benchmarks and fuzzing tests.

Is timeq safely usable from several go-routines?

Yes. There is no real speed benefit from doing so though currently, as the current locking strategy prohibits parallel pushes and reads. Future releases might improve on this.

License

Source code is available under the MIT License.

Contact

Chris Pahl @sahib

TODO List

  • Test crash safety in automated way.
  • Check for integer overflows.
  • Have locking strategy that allows more parallelism.

Documentation

Overview

Package timeq is a file-based priority queue in Go.

Index

Examples

Constants

View Source
const (
	// ReadOpPeek preserves the read data. It will be available on the next call to Read().
	ReadOpPeek = 0

	// ReadOpPop removes the read data. It will not be available on the next call to Read().
	ReadOpPop = 1
)
View Source
const (
	// SyncNone does not sync on normal operation (only on close)
	SyncNone = SyncMode(0)
	// SyncData only synchronizes the data log
	SyncData = SyncMode(1 << iota)
	// SyncIndex only synchronizes the index log (does not make sense alone)
	SyncIndex
	// SyncFull syncs both the data and index log
	SyncFull = SyncData | SyncIndex
)

The available option are inspired by SQLite: https://www.sqlite.org/pragma.html#pragma_synchronous

View Source
const (
	// ErrorModeAbort will immediately abort the current
	// operation if an error is encountered that might lead to data loss.
	ErrorModeAbort = ErrorMode(iota)

	// ErrorModeContinue tries to progress further in case of errors
	// by jumping over a faulty bucket or entry in a
	// If the error was recoverable, none is returned, but the
	// Logger in the Options will be called (if set) to log the error.
	ErrorModeContinue
)

Variables

View Source
var DefaultBucketSplitConf = ShiftBucketSplitConf(37)

DefaultBucketSplitConf assumes that `key` is a nanosecond unix timestamps and divides data (roughly) in 2m minute buckets.

View Source
var (
	// ErrChangedSplitFunc is returned when the configured split func
	// in options does not fit to the state on disk.
	ErrChangedSplitFunc = errors.New("split func changed")
)
View Source
var (
	ErrNoSuchFork = errors.New("no fork with this name")
)

Functions

This section is empty.

Types

type BucketSplitConf added in v0.9.0

type BucketSplitConf struct {
	// Func is the function that does the splitting.
	Func func(item.Key) item.Key

	// Name is used as identifier to figure out
	// when the disk split func changed.
	Name string
}

BucketSplitConf defines what keys are sorted in what bucket. See Options.BucketSplitConf for more info.

func FixedSizeBucketSplitConf added in v0.9.0

func FixedSizeBucketSplitConf(n uint64) BucketSplitConf

FixedSizeBucketSplitConf returns a BucketSplitConf that divides buckets into equal sized buckets with `n` entries. This can also be used to create time-based keys, if you use nanosecond based keys and pass time.Minute to create a buckets with a size of one minute.

func ShiftBucketSplitConf added in v0.9.0

func ShiftBucketSplitConf(shift int) BucketSplitConf

ShiftBucketSplitConf creates a fast BucketSplitConf that divides data into buckets by masking `shift` less significant bits of the key. With a shift of 37 you roughly get 2m buckets (if your key input are nanosecond-timestamps). If you want to calculate the size of a shift, use this formula: (2 ** shift) / (1e9 / 60) = minutes

type Consumer added in v0.9.0

type Consumer interface {
	Read(n int, fn TransactionFn) error
	Delete(from, to Key) (int, error)
	Shovel(dst *Queue) (int, error)
	Len() int
	Fork(name ForkName) (*Fork, error)
}

Consumer is an interface that both Fork and Queue implement. It covers every consumer related API. Please refer to the respective Queue methods for details.

type ErrorMode added in v0.9.0

type ErrorMode int

func (ErrorMode) IsValid added in v0.9.0

func (em ErrorMode) IsValid() bool

type Fork added in v0.9.0

type Fork struct {
	// contains filtered or unexported fields
}

Fork is an implementation of the Consumer interface for a named fork. See the Fork() method for more explanation.

func (*Fork) Delete added in v0.9.0

func (f *Fork) Delete(from, to Key) (int, error)

Delete is like Queue.Delete().

func (*Fork) Fork added in v0.9.0

func (f *Fork) Fork(name ForkName) (*Fork, error)

Fork is like Queue.Fork(), except that the fork happens relative to the current state of the consumer and not to the state of the underlying Queue.

func (*Fork) Len added in v0.9.0

func (f *Fork) Len() int

Len is like Queue.Len().

func (*Fork) Read added in v0.9.0

func (f *Fork) Read(n int, fn TransactionFn) error

Read is like Queue.Read().

func (*Fork) Remove added in v0.9.0

func (f *Fork) Remove() error

Remove removes this fork. If the fork is used after this, the API will return ErrNoSuchFork in all cases.

func (*Fork) Shovel added in v0.9.0

func (f *Fork) Shovel(dst *Queue) (int, error)

Shovel is like Queue.Shovel(). The data of the current fork is pushed to the `dst` queue.

type ForkName added in v0.9.0

type ForkName string

ForkName is the name of a specific fork.

func (ForkName) Validate added in v0.9.0

func (name ForkName) Validate() error

Validate checks if this for has a valid name. A fork is valid if its name only consists of alphanumeric and/or dash or underscore characters.

type Item

type Item = item.Item

Item is a single item that you push or pop from the queue.

type Items

type Items = item.Items

Items is a list of items.

func PeekCopy added in v0.9.0

func PeekCopy(c Consumer, n int) (Items, error)

PeekCopy works like a simplified Read() but copies the items and does not remove them. It is less efficient and should not be used if you care for performance.

func PopCopy added in v0.9.0

func PopCopy(c Consumer, n int) (Items, error)

PopCopy works like a simplified Read() but copies the items and pops them. It is less efficient and should not be used if you care for performance.

Example
// Error handling stripped for brevity:
dir, _ := os.MkdirTemp("", "timeq-example")
defer os.RemoveAll(dir)

// Open the queue. If it does not exist, it gets created:
queue, _ := Open(dir, DefaultOptions())

items := make(Items, 0, 5)
for idx := 0; idx < 10; idx++ {
	items = append(items, Item{
		Key:  Key(idx),
		Blob: []byte(fmt.Sprintf("%d", idx)),
	})
}

_ = queue.Push(items)
got, _ := PopCopy(queue, 5)
for _, item := range got {
	fmt.Println(item.Key)
}
Output:

K00000000000000000000
K00000000000000000001
K00000000000000000002
K00000000000000000003
K00000000000000000004

type Key

type Key = item.Key

Key is the priority of each item in the queue. Lower keys will be popped first.

type Logger added in v0.9.0

type Logger interface {
	Printf(fmt string, args ...any)
}

Logger is a small interface to redirect logs to. The default logger outputs to stderr.

func DefaultLogger added in v0.9.0

func DefaultLogger() Logger

DefaultLogger produces a logger that writes to stderr.

func NullLogger added in v0.9.0

func NullLogger() Logger

NullLogger produces a logger that discards all messages.

func WriterLogger added in v0.9.0

func WriterLogger(w io.Writer) Logger

type Options

type Options struct {
	// SyncMode controls how often we sync data to the disk. The more data we sync
	// the more durable is the queue at the cost of throughput.
	// Default is the safe SyncFull. Think twice before lowering this.
	SyncMode SyncMode

	// Logger is used to output some non-critical warnigns or errors that could
	// have been recovered. By default we print to stderr.
	// Only warnings or errors are logged, no debug or informal messages.
	Logger Logger

	// ErrorMode defines how non-critical errors are handled.
	// See the individual enum values for more info.
	ErrorMode ErrorMode

	// BucketSplitConf defines what key goes to what bucket.
	// The provided function should clamp the key value to
	// a common value. Each same value that was returned goes
	// into the same  The returned value should be also
	// the minimum key of the
	//
	// Example: '(key / 10) * 10' would produce buckets with 10 items.
	//
	// What bucket size to choose? Please refer to the FAQ in the README.
	//
	// NOTE: This may not be changed after you opened a queue with it!
	//       Only way to change is to create a new queue and shovel the
	//       old data into it.
	BucketSplitConf BucketSplitConf

	// MaxParallelOpenBuckets limits the number of buckets that can be opened
	// in parallel. Normally, operations like Push() will create more and more
	// buckets with time and old buckets do not get closed automatically, as
	// we don't know when they get accessed again. If there are more buckets
	// open than this number they get closed and will be re-opened if accessed
	// again. If this happens frequently, this comes with a performance penalty.
	// If you tend to access your data with rather random keys, you might want
	// to increase this number, depending on how much resources you have.
	//
	// If this number is <= 0, then this feature is disabled, which is not
	// recommended.
	MaxParallelOpenBuckets int
}

Options gives you some knobs to configure the queue. Read the individual options carefully, as some of them can only be set on the first call to Open()

func DefaultOptions

func DefaultOptions() Options

DefaultOptions give you a set of options that are good to enough to try some experiments. Your mileage can vary a lot with different settings, so make sure to do some benchmarking!

func (*Options) Validate added in v0.9.0

func (o *Options) Validate() error

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is the high level API to the priority queue.

Example
// Error handling stripped for brevity:
dir, _ := os.MkdirTemp("", "timeq-example")
defer os.RemoveAll(dir)

// Open the queue. If it does not exist, it gets created:
queue, _ := Open(dir, DefaultOptions())

// Push some items to it:
pushItems := make(Items, 0, 10)
for idx := 0; idx < 10; idx++ {
	pushItems = append(pushItems, Item{
		Key:  Key(idx),
		Blob: []byte(fmt.Sprintf("key_%d", idx)),
	})
}

_ = queue.Push(pushItems)

// Retrieve the same items again:
_ = queue.Read(10, func(_ Transaction, popItems Items) (ReadOp, error) {
	// Just for example purposes, check if they match:
	if reflect.DeepEqual(pushItems, popItems) {
		fmt.Println("They match! :)")
	} else {
		fmt.Println("They do not match! :(")
	}

	return ReadOpPop, nil
})
Output:

They match! :)

func Open

func Open(dir string, opts Options) (*Queue, error)

Open tries to open the priority queue structure in `dir`. If `dir` does not exist, then a new, empty priority queue is created. The behavior of the queue can be fine-tuned with `opts`.

func (*Queue) Clear

func (q *Queue) Clear() error

Clear fully deletes the queue contents.

func (*Queue) Close

func (q *Queue) Close() error

Close should always be called and error checked when you're done with using the queue. Close might still flush out some data, depending on what sync mode you configured.

func (*Queue) Delete added in v0.9.0

func (q *Queue) Delete(from, to Key) (int, error)

Delete deletes all items in the range `from` to `to`. Both `from` and `to` are including, i.e. keys with this value are deleted. The number of deleted items is returned.

func (*Queue) Fork added in v0.9.0

func (q *Queue) Fork(name ForkName) (*Fork, error)

Fork splits the reading end of the queue in two parts. If Pop() is called on the returned Fork (which implements the Consumer interface), then other forks and the original queue is not affected.

The process of forking is relatively cheap and adds only minor storage and memory cost to the queue as a whole. Performance during pushing and popping is almost not affected at all.

Example
// Error handling stripped for brevity:
dir, _ := os.MkdirTemp("", "timeq-example")
defer os.RemoveAll(dir)

// Open the queue. If it does not exist, it gets created:
queue, _ := Open(dir, DefaultOptions())

// For the consuming end in half:
fork, _ := queue.Fork("fork")

// Push some items to it - they are added to both the regular queue
// as well to the fork we just created.
_ = queue.Push(Items{
	Item{
		Key:  123,
		Blob: []byte("some data"),
	},
})

// Check the main queue contents:
_ = queue.Read(1, func(_ Transaction, items Items) (ReadOp, error) {
	fmt.Println(string(items[0].Blob))
	return ReadOpPop, nil
})

// The same data should be available in the fork,
// as it was not popped by the read above.
_ = fork.Read(1, func(_ Transaction, items Items) (ReadOp, error) {
	fmt.Println(string(items[0].Blob))
	return ReadOpPop, nil
})
Output:

some data
some data

func (*Queue) Forks added in v0.9.0

func (q *Queue) Forks() []ForkName

Forks returns a list of fork names. The list will be empty if there are no forks yet. In other words: The initial queue is not counted as fork.

func (*Queue) Len

func (q *Queue) Len() int

Len returns the number of items in the queue. NOTE: This gets more expensive when you have a higher number of buckets, so you probably should not call that in a hot loop.

func (*Queue) Push

func (q *Queue) Push(items Items) error

Push pushes a batch of `items` to the queue. It is allowed to call this function during the read callback.

func (*Queue) Read added in v0.9.0

func (q *Queue) Read(n int, fn TransactionFn) error

Read fetches up to `n` items from the queue. It will call the supplied `fn` one or several times until either `n` is reached or the queue is empty. If the queue is empty before calling Read(), then `fn` is not called. If `n` is negative, then as many items as possible are returned until the queue is empty.

The `dst` argument can be used to pass a preallocated slice that the queue appends to. This can be done to avoid allocations. If you don't care you can also simply pass nil.

You should NEVER use the supplied items outside of `fn`, as they are directly sliced from a mmap(2). Accessing them outside will almost certainly lead to a crash. If you need them outside (e.g. for appending to a slice) then you can use the Copy() function of Items.

You can return either ReadOpPop or ReadOpPeek from `fn`.

You may only call Push() inside the read transaction. All other operations will DEADLOCK if called!

func (*Queue) Shovel added in v0.9.0

func (q *Queue) Shovel(dst *Queue) (int, error)

Shovel moves items from `src` to `dst`. The `src` queue will be completely drained afterwards. For speed reasons this assume that the dst queue uses the same bucket func as the source queue. If you cannot guarantee this, you should implement a naive Shovel() implementation that just uses Pop/Push.

This method can be used if you want to change options like the BucketSplitConf or if you intend to have more than one queue that are connected by some logic. Examples for the latter case would be a "deadletter queue" where you put failed calculations for later re-calculations or a queue for unacknowledged items.

func (*Queue) Sync

func (q *Queue) Sync() error

Sync can be called to explicitly sync the queue contents to persistent storage, even if you configured SyncNone.

type ReadOp added in v0.9.0

type ReadOp int

ReadOp defines what timeq should do with the data that was read.

type SyncMode added in v0.9.0

type SyncMode int

func (SyncMode) IsValid added in v0.9.0

func (sm SyncMode) IsValid() bool

type Transaction added in v0.9.0

type Transaction interface {
	Push(items Items) error
}

Transaction is a handle to the queue during the read callback. See TransactionFn for more details.

Example
// Error handling stripped for brevity:
dir, _ := os.MkdirTemp("", "timeq-example")
defer os.RemoveAll(dir)

// Open the queue. If it does not exist, it gets created:
queue, _ := Open(dir, DefaultOptions())

_ = queue.Push(Items{
	Item{
		Key:  123,
		Blob: []byte("some data"),
	},
	Item{
		Key:  456,
		Blob: []byte("other data"),
	},
})

_ = queue.Read(1, func(tx Transaction, items Items) (ReadOp, error) {
	// Push half of the data back to the queue.
	// You can use that to "unread" parts of what you read.
	return ReadOpPop, tx.Push(items[1:])
})

fmt.Println(queue.Len())
Output:

1

type TransactionFn added in v0.9.0

type TransactionFn func(tx Transaction, items Items) (ReadOp, error)

TransactionFn is the function passed to the Read() call. It will be called zero to multiple times with a number of items that was read. You can decide with the return value what to do with this data. Returning an error will immediately stop further reading. The current data will not be touched and the error is bubbled up.

The `tx` parameter can be used to push data back to the queue. It might be extended in future releases.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL