statedb

package module
v0.0.0-...-7cd6fd4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2024 License: Apache-2.0 Imports: 28 Imported by: 0

README

📝 StateDB GoDoc

StateDB is an in-memory database for Go. The database is built on top of Persistent Adaptive Radix Trees.

StateDB is/supports:

  • In-memory. Objects and indexes are stored in main memory and not on disk. This makes it easy to store and index any Go data type.

  • Multi-Version Concurrency Control (MVCC). Both objects and indexes are immutable and objects are versioned. A read transaction has access to an immutable snapshot of the data.

  • Cross-table write transactions. Write transactions lock the requested tables and allow modifying objects in multiple tables as a single atomic action. Transactions can be aborted to throw away the changes.

  • Multiple indexes. A table may have one or more indexers for objects, with each indexer returning zero or more keys. Indexes can be unique or non-unique. A non-unique index is a concatenation of the primary and secondary keys.

  • Watch channels. Changes to the database can be watched at fine-granularity via Go channels that close when a relevant part of the database changes. This is implemented by having a Go channel at each of the radix tree nodes. This enables watching an individual object for changes, a key prefix, or the whole table.

Warning! Immutable data! Read this!

To support lockless readers and transactionality StateDB relies on both the indexes and the objects themselves being immutable. Since in Go you cannot declare fields const we cannot stop mutation of public fields in objects. This means that care must be taken with objects stored in StateDB and not mutate objects that have been inserted into it. This means both the fields directly in the object and everything referenced from it, e.g. a map field must not be modified, but must be cloned first!

StateDB has a check in Insert() to validate that if an object is a pointer then it cannot be replaced with the same pointer, but that at least a shallow clone has been made. This of course doesn't extend to references within the object.

For "very important objects", please consider storing an interface type instead that contains getter methods and a safe way of mutating the object, e.g. via the builder pattern or a constructor function.

Also prefer persistent/immutable data structures within the object to avoid expensive copying on mutation. The part package comes with persistent Map[K]V and Set[T].

Example

Here's a quick example to show how using StateDB looks like.

// Define an object to store in the database.
type MyObject struct {
  ID uint32
  Foo string
}

// Define how to index and query the object.
var IDIndex = statedb.Index[*MyObject, uint32]{
  Name: "id",
  FromObject: func(obj *MyObject) index.KeySet {
    return index.NewKeySet(index.Uint64(obj.ID))
  },
  FromKey: func(id uint32) index.Key {
    return index.Uint32(id)
  },
  Unique: true,
}

// Create the database and the table.
func example() {
  db := statedb.New()
  myObjects, err := statedb.NewTable(
    "my-objects",
    IDIndex,
  )
  if err != nil { ... }

  if err := db.RegisterTable(myObjects); err != nil {
    ...
  }

  wtxn := db.WriteTxn(myObjects)
  
  // Insert some objects
  myObjects.Insert(wtxn, &MyObject{1, "a"})
  myObjects.Insert(wtxn, &MyObject{2, "b"})
  myObjects.Insert(wtxn, &MyObject{3, "c"})

  // Modify an object
  if obj, _, found := myObjects.Get(wtxn, IDIndex.Query(1)); found {
    objCopy := *obj
    objCopy.Foo = "d"
    myObjects.Insert(wtxn, &objCopy)
  }

  // Delete an object
  if obj, _, found := myObjects.Get(wtxn, IDIndex.Query(2)); found {
    myObjects.Delete(wtxn, obj)
  }
  
  if feelingLucky {
    // Commit the changes.
    wtxn.Commit()
  } else {
    // Throw away the changes.
    wtxn.Abort()
  }

  // Query the objects with a snapshot of the database.
  txn := db.ReadTxn()

  if obj, _, found := myObjects.Get(wtxn, IDIndex.Query(1)); found {
    ...
  }

  iter, watch := myObjects.All()
  // Iterate all objects

  iter, watch = myObjects.LowerBound(IDIndex.Query(2))
  // Iterate objects with ID >= 2
  
  iter, watch = myObjects.Prefix(IDIndex.Query(0x1000_0000))
  // Iterate objects where ID is between 0x1000_0000 and 0x1fff_ffff

  for obj, revision, ok := iter.Next(); ok; obj, revision, ok = iter.Next() {
    ...
  }

  // Wait until the query results change.
  <-watch
}

Read on for a more detailed guide or check out the Go package docs.

Guide to StateDB

StateDB can be used directly as a normal library, or as a Hive Cell. For example usage as part of Hive, see reconciler/example. Here we show a standalone example.

We start by defining the data type we want to store in the database. There are no constraints on the type and it may be a primitive type like an int or a struct type, or a pointer. Since each index stores a copy of the object one should use a pointer if the object is large.

import (
  "github.com/cilium/statedb"
  "github.com/cilium/statedb/index"
  "github.com/cilium/statedb/part"
)

type ID = uint64
type Tag = string
type MyObject struct {
  ID ID              // Identifier
  Tags part.Set[Tag] // Set of tags
}
Indexes

With the object defined, we can describe how it should be indexed. Indexes are constant values and can be defined as global variables alongside the object type. Indexes take two type parameters, your object type and the key type: Index[MyObject, ID]. Additionally you define two operations: FromObject that takes your object and returns a set of StateDB keys (zero or many), and FromKey that takes the key type of your choosing and converts it to a StateDB key.

// IDIndex is the primary index for MyObject indexing the 'ID' field.
var IDIndex = statedb.Index[*MyObject, ID]{
  Name: "id",

  FromObject: func(obj *MyObject) index.KeySet {
    return index.NewKeySet(index.Uint64(obj.ID))
  }

  FromKey: func(id ID) index.Key {
    return index.Uint64(id)
  }
  // Above is equal to just:
  // FromKey: index.Uint64,

  Unique: true, // IDs are unique.
}

The index.Key seen above is just a []byte. The index package contains many functions for converting into the index.Key type, for example index.Uint64 and so on.

A single object can also map to multiple keys (multi-index). Let's construct an index for tags.

var TagsIndex = statedb.Index[*MyObject, Tag]{
  Name: "tags",

  FromObject: func(o *MyObject) index.KeySet {
    // index.Set turns the part.Set[string] into a set of keys
    // (set of byte slices)
    return index.Set(o.Tags)
  }

  FromKey: index.String,

  // Many objects may have the same tag, so we mark this as
  // non-unique.
  Unique: false,
}

With the indexes now defined, we can construct a table.

Setting up a table
func NewMyObjectTable() (statedb.RWTable[*MyObject], error) {
  return statedb.NewTable[*MyObject](
    "my-objects",

    IDIndex,   // IDIndex is the primary index
    TagsIndex, // TagsIndex is a secondary index
    // ... more secondary indexes can be passed in here
  )
}

The NewTable function takes the name of the table, a primary index and zero or more secondary indexes. It returns a RWTable, which is an interface for both reading and writing to a table. An RWTable is a superset of Table, an interface that contains methods just for reading. This provides a simple form of type-level access control to the table. NewTable may return an error if the indexers are malformed, for example if IDIndex is not unique (primary index has to be), or if the indexers have overlapping names.

Inserting

With the table defined, we can now create the database and start writing and reading to the table.

db := statedb.New()

myObjects, err := NewMyObjectTable()
if err != nil { return err }

// Register the table with the database.
err := db.RegisterTable(myObjects)
if err != nil { 
  // May fail if the table with the same name is already registered.
  return err
}

To insert objects into a table, we'll need to create a WriteTxn. This locks the target table(s) allowing for an atomic transaction change.

// Create a write transaction against the 'myObjects' table, locking
// it for writing.
// Note that the returned 'wtxn' holds internal state and it is not
// safe to use concurrently (e.g. you must not have multiple goroutines
// using the same WriteTxn in parallel).
wtxn := db.WriteTxn(myObjects)

// We can defer an Abort() of the transaction in case we encounter
// issues and want to forget our writes. This is a good practice
// to safe-guard against forgotten call to Commit(). Worry not though,
// StateDB has a finalizer on WriteTxn to catch forgotten Abort/Commit.
defer wtxn.Abort()

// Insert an object into the table. This will be visible to readers
// only when we commit.
obj := &MyObject{ID: 42, Tags: part.NewStringSet("hello")}
oldObj, hadOld, err := myObjects.Insert(wtxn, obj)
if err != nil {
  // Insert can fail only if 'wtxn' is not locking the table we're
  // writing to, or if 'wxtn' was already committed.
  return err
}
// hadOld is true and oldObj points to an old version of the object
// if it was replaced. Since the object type can be a non-pointer
// we need the separate 'hadOld' boolean and cannot just check for nil.

// Commit the changes to the database and notify readers by closing the
// relevant watch channels.
wtxn.Commit()
Reading

Now that there's something in the table we can try out reading. We can read either using a read-only ReadTxn, or we can read using a WriteTxn. With a ReadTxn we'll be reading from a snapshot and nothing we do will affect other readers or writers (unless you mutate the immutable object, in which case bad things happen).

txn := db.ReadTxn()

The txn is now a frozen snapshot of the database that we can use to read the data.

// Let's break out the types so you know what is going on.
var (
  obj *MyObject
  revision statedb.Revision
  found bool
  watch <-chan struct{}
)
// Get returns the first matching object in the query.
obj, revision, found = myObjects.Get(txn, IDIndex.Query(42))
if found {
  // obj points to the object we inserted earlier.
  // revision is the "table revision" for the object. Revisions are
  // incremented for a table on every insertion or deletion.
}
// GetWatch is the same as Get, but also gives us a watch
// channel that we can use to wait on the object to appear or to
// change.
obj, revision, watch, found = myObjects.GetWatch(txn, IDIndex.Query(42))
<-watch // closes when object with ID '42' is inserted or deleted
Iterating

List can be used to iterate over all objects that match the query.

var iter statedb.Iterator[*MyObject]
// List returns all matching objects as an iterator. The iterator is lazy
// and one can stop reading at any time without worrying about the rest.
iter := myObjects.List(txn, TagsIndex.Query("hello"))
for obj, revision, ok := iter.Next(); ok; obj, revision, ok = iter.Next() {
  // ...
}

// ListWatch is like List, but also returns a watch channel.
iter, watch := myObjects.ListWatch(txn, TagsIndex.Query("hello"))
for obj, revision, ok := iter.Next(); ok; obj, revision, ok = iter.Next() { ... }

// closes when an object with tag "hello" is inserted or deleted
<-watch

Prefix can be used to iterate over objects that match a given prefix.

// Prefix does a prefix search on an index. Here it returns an iterator
// for all objects that have a tag that starts with "h".
iter, watch = myObjects.Prefix(txn, TagsIndex.Query("h"))
for obj, revision, ok := iter.Next(); ok; obj, revision, ok = iter.Next() {
  // ...
}
// closes when an object with a tag starting with "h" is inserted or deleted
<-watch

LowerBound can be used to iterate over objects that have a key equal to or higher than given key.

// LowerBound can be used to find all objects with a key equal to or higher
// than specified key. The semantics of it depends on how the indexer works.
// For example index.Uint32 returns the big-endian or most significant byte
// first form of the integer, in other words the number 3 is the key
// []byte{0, 0, 0, 3}, which allows doing a meaningful LowerBound search on it.
iter, watch = myObjects.LowerBound(txn, IDIndex.Query(3))
for obj, revision, ok := iter.Next(); ok; obj, revision, ok = iter.Next() {
  // obj.ID >= 3
}

// closes when anything happens to the table. This is because there isn't a
// clear notion of what part of the index to watch for, e.g. if the index
// stores 0x01, 0x11, 0x20, and we do LowerBound(0x10), then none of these nodes
// in the tree are what we should watch for since "0x01" is in the wrong subtree
// and we may insert "0x10" above "0x11", so cannot watch that either. LowerBound
// could return the watch channel of the node that shares a prefix with the search
// term, but instead StateDB currently does the conservative thing and returns the
// watch channel of the "root node".
<-watch

All objects stored in StateDB have an associated revision. The revision is unique to the table and increments on every insert or delete. Revisions can be queried with ByRevision.

// StateDB also has a built-in index for revisions and that can be used to
// iterate over the objects in the order they have been changed. Furthermore
// we can use this to wait for new changes!
lastRevision := statedb.Revision(0)
for {
  iter, watch = myObjects.LowerBound(txn, statedb.ByRevision(lastRevision+1))
  for obj, revision, ok := iter.Next(); ok; obj, revision, ok = iter.Next() {
    lastRevision = revision
  }

  // Wait until there are new changes. In real code we probably want to
  // do a 'select' here and check for 'ctx.Done()' etc.
  <-watch

  // We should rate limit so we can see a batch of changes in one go.
  // For sake of example just sleeping here, but you likely want to use the
  // 'rate' package.
  time.Sleep(100*time.Millisecond)

  // Take a new snapshot so we can see the changes.
  txn = db.ReadTxn()
}

As it's really useful to know when an object has been deleted, StateDB has a facility for storing deleted objects in a separate index until they have been observed. Using Changes one can iterate over insertions and deletions.

// Let's iterate over both inserts and deletes. We need to use
// a write transaction to create the change iterator as this needs to
// register with the table to track the deleted objects.

wtxn := statedb.WriteTxn(myObjects)
changes, err := myObjects.Changes(wtxn)
wtxn.Commit()
if err != nil {
  // This can fail due to same reasons as e.g. Insert()
  // e.g. transaction not locking target table or it has
  // already been committed.
  return err
}

// We need to remember to Close() it so that StateDB does not hold onto
// deleted objects for us. No worries though, a finalizer will close it
// for us if we do not do this.
defer changes.Close()

// Now very similar to the LowerBound revision iteration above, we will
// iterate over the changes.
for {
  for change, revision, ok := iter.Next(); ok; change, revision, ok = iter.Next() {
    if change.Deleted {
      fmt.Printf("Object %#v was deleted!\n", change.Object)
    } else {
      fmt.Printf("Object %#v was inserted!\n", change.Object)
    }
  }
  // To observe more changes, we create a new ReadTxn and pass it to Watch() that
  // refreshes the iterator. Once the returned channel closes we can iterate again.
  <-changes.Watch(db.ReadTxn())
}
Modifying objects

Modifying objects is basically just a query and an insert to override the object. One must however take care to not modify the object returned by the query.

// Let's make a write transaction to modify the table.
wtxn := db.WriteTxn(myObjects)

// Now that we have the table written we can retrieve an object and none will
// be able to modify it until we commit.
obj, revision, found := myObjects.Get(wtxn, IDIndex.Query(42))
if !found { panic("it should be there, I swear!") }

// We cannot just straight up modify 'obj' since someone might be reading it.
// It's supposed to be immutable after all. To make this easier, let's define
// a Clone() method.
func (obj *MyObject) Clone() *MyObject {
  obj2 := *obj
  return &obj2
}

// Now we can do a "shallow clone" of the object and we can modify the fields
// without the readers getting upset. Of course we still cannot modify anything
// referenced by those fields without cloning the fields themselves. But that's
// why we're using persistent data structures like 'part.Set' and 'part.Map'.
//
// Let's add a new tag. But first we clone.
obj = obj.Clone()
obj.Tags = obj.Tags.Set("foo")

// Now we have a new object that has "foo" set. We can now write it to the table.
oldObj, hadOld, err := myObjects.Insert(wtxn, obj)
// err should be nil, since we're using the WriteTxn correctly
// oldObj is the original object, without the "foo" tag
// hadOld is true since we replaced the object

// Commit the transaction so everyone sees it.
wtxn.Commit()

// We can also do a "compare-and-swap" to insert an object. This is useful when
// computing the change we want to make is expensive. Here's how you do it.

// Start with a ReadTxn that is cheap and doesn't block anyone.
txn := db.ReadTxn()

// Look up the object we want to update and perform some slow calculation
// to produce the desired new object.
obj, revision, found := myObjects.Get(txn, IDIndex.Query(42))
obj = veryExpensiveCalculation(obj)

// Now that we're ready to insert we can grab a WriteTxn.
wtxn := db.WriteTxn(myObjects)

// Let's try and update the object with the revision of the object we used
// for that expensive calculation.
oldObj, hadOld, err := myObjects.CompareAndSwap(wtxn, obj, revision)
if errors.Is(err, statedb.ErrRevisionNotEqual) {
  // Oh no, someone updated the object while we were calculating.
  // I guess I need to calculate again...
  wtxn.Abort()
  return err
}
wtxn.Commit()
Performance considerations

Needless to say, one should keep the duration of the write transactions as short as possible so that other writers are not starved (readers are not affected as they're reading from a snapshot). Writing in batches or doing first a ReadTxn to compute the changes and committing with CompareAndSwap is a good way to accomplish this as shown above (optimistic concurrency control).

One should also avoid keeping the ReadTxn around when for example waiting on a watch channel to close. The ReadTxn holds a pointer to the database root and thus holding it will prevent old objects from being garbage collected by the Go runtime. Considering grabbing the ReadTxn in a function and returning the watch channel to the function doing the for-select loop.

Persistent Map and Set

The part package contains persistent Map[K, V] and Set[T] data structures. These, like StateDB, are implemented with the Persistent Adaptive Radix Trees. They are meant to be used as replacements for the built-in mutable Go hashmap in StateDB objects as they're persistent (operations return a copy) and thus more efficient to copy and suitable to use in immutable objects.

Here's how to use Map[K, V]:

import (
  "github.com/cilium/statedb/part"
)

// Create a new map with strings as keys
m := part.NewStringMap[int]()

// Set the key "one" to value 1. Returns a new map.
mNew := m.Set("one", 1)
v, ok := m.Get("one")
// ok == false since we didn't modify the original map.

v, ok = mNew.Get("one")
// v == 1, ok == true

// Let's reuse 'm' as our variable.
m = mNew
m = m.Set("two")

// All key-value pairs can be iterated over.
iter := m.All()
// Maps can be prefix and lowerbound searched, just like StateDB tables
iter = m.Prefix("a")  // Iterator for anything starting with 'a'
iter = m.LowerBound("b") // Iterator for anything equal to 'b' or larger, e.g. 'bb' or 'c'...

for k, v, ok := iter.Next(); ok; k, v, ok = iter.Next() {
  // ...
}

m.Len() == 2
m = m.Delete("two")
m.Len() == 1

// We can use arbitrary types as keys and values... provided
// we teach it how to create a byte slice key out of it.
type Obj struct {
  ID string
}
m2 := part.NewMap[*Obj, *Obj](
  func(o *Obj) []byte { return []byte(o.ID) },
  func(b []byte) string { return string(b) },
)
o := &Obj{ID: "foo"}
m2.Set(o, o)

And here's Set[T]:

// 's' is now the empty string set
s := part.StringSet
s = s.Set("hello")
s.Has("hello") == true
s2 := s.Delete("hello")
s.Has("hello") == true
s2.Has("hello") == false

// we can initialize a set with NewStringSet
s3 := part.NewStringSet("world", "foo")

// Sets can be combined.
s3 = s3.Union(s)
// s3 now contains "hello", "foo", world"
s3.Len() == 3

// Print "hello", "foo", "world"
iter := s3.All()
for word, ok := iter.Next(); ok; word, ok = iter.Next() {
  fmt.Println(word)
}

// We can remove a set from another set
s4 := s3.Difference(part.NewStringSet("foo"))
s4.Has("foo") == false

// As with Map[K, V] we can define Set[T] for our own objects
type Obj struct {
  ID string
}
s5 := part.NewSet[*Obj](
  func(o *Obj) []byte { return []byte(o.ID) },
)
s5.Set(&Obj{"quux"})
s5.Has(&Obj{"quux"}) == true

Reconciler

This repository comes with a generic reconciliation utility that watches a table for changes and performs a configurable Update or Delete operation on the change. The status of the operation is written back into the object, which allows inspecting or waiting for an object to be reconciled. On failures the reconciler will retry the operation at a later time. Reconciler supports health reporting and metrics.

See the example application in reconciler/example for more information.

Documentation

Overview

The statedb package provides a transactional in-memory database with per-table locking. The database indexes objects using Persistive Adaptive Radix Trees. (https://db.in.tum.de/~leis/papers/ART.pdf)

As this is built around an immutable data structure and objects may have lockless readers the stored objects MUST NOT be mutated, but instead a copy must be made prior to mutation and insertion.

See 'example/' for an example how to construct an application that uses this library.

Index

Constants

View Source
const (
	PrimaryIndexPos = 0

	RevisionIndex             = "__revision__"
	RevisionIndexPos          = 1
	GraveyardIndex            = "__graveyard__"
	GraveyardIndexPos         = 2
	GraveyardRevisionIndex    = "__graveyard_revision__"
	GraveyardRevisionIndexPos = 3

	SecondaryIndexStartPos = 4
)

Variables

View Source
var (
	// ErrDuplicateTable indicates that StateDB has been provided with two or more table definitions
	// that share the same table name.
	ErrDuplicateTable = errors.New("table already exists")

	// ErrPrimaryIndexNotUnique indicates that the primary index for the table is not marked unique.
	ErrPrimaryIndexNotUnique = errors.New("primary index not unique")

	// ErrDuplicateIndex indicates that the table has two or more indexers that share the same name.
	ErrDuplicateIndex = errors.New("index name already in use")

	// ErrReservedPrefix indicates that the index name is using the reserved prefix and should
	// be renamed.
	ErrReservedPrefix = errors.New("index name uses reserved prefix '" + reservedIndexPrefix + "'")

	// ErrTransactionClosed indicates that a write operation is performed using a transaction
	// that has already been committed or aborted.
	ErrTransactionClosed = errors.New("transaction is closed")

	// ErrTableNotLockedForWriting indicates that a write operation is performed against a
	// table that was not locked for writing, e.g. target table not given as argument to
	// WriteTxn().
	ErrTableNotLockedForWriting = errors.New("not locked for writing")

	// ErrRevisionNotEqual indicates that the CompareAndSwap or CompareAndDelete failed due to
	// the object having a mismatching revision, e.g. it had been changed since the object
	// was last read.
	ErrRevisionNotEqual = errors.New("revision not equal")

	// ErrObjectNotFound indicates that the object was not found when the operation required
	// it to exists. This error is not returned by Insert or Delete, but may be returned by
	// CompareAndSwap or CompareAndDelete.
	ErrObjectNotFound = errors.New("object not found")
)
View Source
var Cell = cell.Module(
	"statedb",
	"In-memory transactional database",

	cell.Provide(
		newHiveDB,
	),
)

This module provides an in-memory database built on top of immutable radix trees As the database is based on an immutable data structure, the objects inserted into the database MUST NOT be mutated, but rather copied first!

Functions

func Collect

func Collect[Obj any](iter Iterator[Obj]) []Obj

Collect creates a slice of objects out of the iterator. The iterator is consumed in the process.

func Derive

func Derive[In, Out any](jobName string, transform func(obj In, deleted bool) (Out, DeriveResult)) func(DeriveParams[In, Out])

Derive constructs and registers a job to transform objects from the input table to the output table, e.g. derive the output table from the input table. Useful when constructing a reconciler that has its desired state solely derived from a single table. For example the bandwidth manager's desired state is directly derived from the devices table.

Derive is parametrized with the transform function that transforms the input object into the output object. If the transform function returns false, then the object is skipped.

Example use:

cell.Invoke(
  statedb.Derive[*tables.Device, *Foo](
    func(d *Device, deleted bool) (*Foo, DeriveResult) {
      if deleted {
        return &Foo{Index: d.Index}, DeriveDelete
      }
      return &Foo{Index: d.Index}, DeriveInsert
    }),
)

func Observable

func Observable[Obj any](db *DB, table Table[Obj]) stream.Observable[Change[Obj]]

Observable creates an observable from the given table for observing the changes to the table as a stream of events.

For high-churn tables it's advisable to apply rate-limiting to the stream to decrease overhead (stream.Throttle).

func ProcessEach

func ProcessEach[Obj any, It Iterator[Obj]](iter It, fn func(Obj, Revision) error) (err error)

ProcessEach invokes the given function for each object provided by the iterator.

func RegisterTable

func RegisterTable[Obj any](db *DB, table RWTable[Obj]) error

RegisterTable registers a table to the database:

func NewMyTable() statedb.RWTable[MyTable] { ... }
cell.Provide(NewMyTable),
cell.Invoke(statedb.RegisterTable[MyTable]),

Types

type Change

type Change[Obj any] struct {
	Object   Obj
	Revision Revision
	Deleted  bool
}

Change is either an update or a delete of an object. Used by Changes() and the Observable().

type ChangeIterator

type ChangeIterator[Obj any] interface {
	Iterator[Change[Obj]]

	// Watch refreshes the iteration with a new query and returns a watch channel to wait
	// for new changes after Next() has returned false.
	Watch(ReadTxn) <-chan struct{}

	// Close closes the iterator. This must be called when one is done using
	// the iterator as a tracker is created for deleted objects and the
	// deleted objects are held onto until all event iterators have observed
	// the deletion.
	Close()
}

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB provides an in-memory transaction database built on top of immutable radix trees. The database supports multiple tables, each with one or more user-defined indexes. Readers can access the data locklessly with a simple atomic pointer read to obtain a snapshot. On writes to the database table-level locks are acquired on target tables and on write transaction commit a root lock is taken to swap in the new root with the modified tables.

As data is stored in immutable data structures any objects inserted into it MUST NOT be mutated afterwards.

DB holds the "root" tree of tables with each table holding a tree of indexes:

           root
          /    \
         ba    T(foo)
       /   \
   T(bar)  T(baz)

      T(bar).indexes
	   /  \
	  i    I(byRevision)
	/   \
   I(id)    I(ip)

          I(ip)
          /  \
        192  172
        /     ...
    bar(192.168.1.1)

T = tableEntry I = indexTree

To lookup:

  1. Create a read (or write) transaction
  2. Find the table from the root tree
  3. Find the index from the table's index tree
  4. Find the object from the index

To insert:

  1. Create write transaction against the target table
  2. Find the table from the root tree
  3. Create/reuse write transaction on primary index
  4. Insert/replace the object into primary index
  5. Create/reuse write transaction on revision index
  6. If old object existed, remove from revision index
  7. If old object existed, remove from graveyard
  8. Update each secondary index
  9. Commit transaction by committing each index to the table and then committing table to the root. Swap the root atomic pointer to new root and notify by closing channels of all modified nodes.

To observe deletions:

  1. Create write transaction against the target table
  2. Create new delete tracker and add it to the table
  3. Commit the write transaction to update the table with the new delete tracker
  4. Query the graveyard by revision, starting from the revision of the write transaction at which it was created.
  5. For each successfully processed deletion, mark the revision to set low watermark for garbage collection.
  6. Periodically garbage collect the graveyard by finding the lowest revision of all delete trackers.

func New

func New(options ...Option) *DB

New creates a new database.

The created database must be started and stopped!

func (*DB) HTTPHandler

func (db *DB) HTTPHandler() http.Handler

func (*DB) NewHandle

func (db *DB) NewHandle(name string) Handle

NewHandle returns a named handle to the DB. The handle has the same ReadTxn and WriteTxn methods as DB, but annotated with the given name for more accurate cost accounting in e.g. metrics.

func (*DB) ReadTxn

func (db *DB) ReadTxn() ReadTxn

ReadTxn constructs a new read transaction for performing reads against a snapshot of the database.

The returned ReadTxn is not thread-safe.

func (*DB) RegisterTable

func (db *DB) RegisterTable(table TableMeta, tables ...TableMeta) error

RegisterTable registers a table to the database.

func (*DB) ServeHTTP

func (db *DB) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP is an HTTP handler for dumping StateDB as JSON.

Example usage:

var db *statedb.DB

http.Handle("/db", db)
http.ListenAndServe(":8080", nil)

func (*DB) Start

func (db *DB) Start() error

Start the background workers for the database.

This starts the graveyard worker that deals with garbage collecting deleted objects that are no longer necessary for Changes().

func (*DB) Stop

func (db *DB) Stop() error

Stop the background workers.

func (*DB) WriteTxn

func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn

WriteTxn constructs a new write transaction against the given set of tables. Each table is locked, which may block until the table locks are acquired. The modifications performed in the write transaction are not visible outside it until Commit() is called. To discard the changes call Abort().

The returned WriteTxn is not thread-safe.

type DeriveParams

type DeriveParams[In, Out any] struct {
	cell.In

	Lifecycle cell.Lifecycle
	Jobs      job.Registry
	Health    cell.Health
	DB        *DB
	InTable   Table[In]
	OutTable  RWTable[Out]
}

type DeriveResult

type DeriveResult int
const (
	DeriveInsert DeriveResult = 0 // Insert the object
	DeriveUpdate DeriveResult = 1 // Update the object (if it exists)
	DeriveDelete DeriveResult = 2 // Delete the object
	DeriveSkip   DeriveResult = 3 // Skip
)

type DualIterator

type DualIterator[Obj any] struct {
	// contains filtered or unexported fields
}

DualIterator allows iterating over two iterators in revision order. Meant to be used for combined iteration of LowerBound(ByRevision) and Deleted().

func NewDualIterator

func NewDualIterator[Obj any](left, right Iterator[Obj]) *DualIterator[Obj]

func (*DualIterator[Obj]) Next

func (it *DualIterator[Obj]) Next() (obj Obj, revision uint64, fromLeft, ok bool)

type ExpVarMetrics

type ExpVarMetrics struct {
	LockContentionVar            *expvar.Map
	GraveyardCleaningDurationVar *expvar.Map
	GraveyardLowWatermarkVar     *expvar.Map
	GraveyardObjectCountVar      *expvar.Map
	ObjectCountVar               *expvar.Map
	WriteTxnAcquisitionVar       *expvar.Map
	WriteTxnDurationVar          *expvar.Map
	DeleteTrackerCountVar        *expvar.Map
	RevisionVar                  *expvar.Map
}

ExpVarMetrics is a simple implementation for the metrics.

func NewExpVarMetrics

func NewExpVarMetrics(publish bool) *ExpVarMetrics

func (*ExpVarMetrics) DeleteTrackerCount

func (m *ExpVarMetrics) DeleteTrackerCount(name string, numTrackers int)

func (*ExpVarMetrics) GraveyardCleaningDuration

func (m *ExpVarMetrics) GraveyardCleaningDuration(name string, duration time.Duration)

func (*ExpVarMetrics) GraveyardLowWatermark

func (m *ExpVarMetrics) GraveyardLowWatermark(name string, lowWatermark Revision)

func (*ExpVarMetrics) GraveyardObjectCount

func (m *ExpVarMetrics) GraveyardObjectCount(name string, numDeletedObjects int)

func (*ExpVarMetrics) ObjectCount

func (m *ExpVarMetrics) ObjectCount(name string, numObjects int)

func (*ExpVarMetrics) Revision

func (m *ExpVarMetrics) Revision(name string, revision uint64)

func (*ExpVarMetrics) String

func (m *ExpVarMetrics) String() (out string)

func (*ExpVarMetrics) WriteTxnDuration

func (m *ExpVarMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration)

func (*ExpVarMetrics) WriteTxnTableAcquisition

func (m *ExpVarMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration)

func (*ExpVarMetrics) WriteTxnTotalAcquisition

func (m *ExpVarMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration)

type Handle

type Handle struct {
	// contains filtered or unexported fields
}

Handle is a named handle to the database for constructing read or write transactions.

func (Handle) ReadTxn

func (h Handle) ReadTxn() ReadTxn

ReadTxn constructs a new read transaction for performing reads against a snapshot of the database.

The returned ReadTxn is not thread-safe.

func (Handle) WriteTxn

func (h Handle) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn

type Index

type Index[Obj any, Key any] struct {
	Name       string
	FromObject func(obj Obj) index.KeySet
	FromKey    func(key Key) index.Key
	Unique     bool
}

Index implements the indexing of objects (FromObjects) and querying of objects from the index (FromKey)

func (Index[Obj, Key]) ObjectToKey

func (i Index[Obj, Key]) ObjectToKey(obj Obj) index.Key

func (Index[Obj, Key]) Query

func (i Index[Obj, Key]) Query(key Key) Query[Obj]

Query constructs a query against this index from a key.

func (Index[Obj, Key]) QueryFromObject

func (i Index[Obj, Key]) QueryFromObject(obj Obj) Query[Obj]

type IndexName

type IndexName = string

type Indexer

type Indexer[Obj any] interface {
	ObjectToKey(Obj) index.Key
	QueryFromObject(Obj) Query[Obj]
	// contains filtered or unexported methods
}

Indexer is the "FromObject" subset of Index[Obj, Key] without the 'Key' constraint.

type Iterator

type Iterator[Obj any] interface {
	// Next returns the next object and its revision if ok is true, otherwise
	// zero values to mean that the iteration has finished.
	Next() (obj Obj, rev Revision, ok bool)
}

Iterator for iterating objects returned from queries.

func Filter

func Filter[Obj any, It Iterator[Obj]](iter It, pred func(Obj) bool) Iterator[Obj]

Filter includes objects for which the supplied predicate returns true

func Map

func Map[In, Out any, It Iterator[In]](iter It, transform func(In) Out) Iterator[Out]

Map applies a function to transform every object returned by the iterator

type Metrics

type Metrics interface {
	WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration)
	WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration)
	WriteTxnDuration(handle string, tables []string, acquire time.Duration)

	GraveyardLowWatermark(tableName string, lowWatermark Revision)
	GraveyardCleaningDuration(tableName string, duration time.Duration)
	GraveyardObjectCount(tableName string, numDeletedObjects int)
	ObjectCount(tableName string, numObjects int)

	DeleteTrackerCount(tableName string, numTrackers int)
	Revision(tableName string, revision Revision)
}

type NopMetrics

type NopMetrics struct{}

func (*NopMetrics) DeleteTrackerCount

func (*NopMetrics) DeleteTrackerCount(tableName string, numTrackers int)

DeleteTrackerCount implements Metrics.

func (*NopMetrics) GraveyardCleaningDuration

func (*NopMetrics) GraveyardCleaningDuration(tableName string, duration time.Duration)

GraveyardCleaningDuration implements Metrics.

func (*NopMetrics) GraveyardLowWatermark

func (*NopMetrics) GraveyardLowWatermark(tableName string, lowWatermark uint64)

GraveyardLowWatermark implements Metrics.

func (*NopMetrics) GraveyardObjectCount

func (*NopMetrics) GraveyardObjectCount(tableName string, numDeletedObjects int)

GraveyardObjectCount implements Metrics.

func (*NopMetrics) ObjectCount

func (*NopMetrics) ObjectCount(tableName string, numObjects int)

ObjectCount implements Metrics.

func (*NopMetrics) Revision

func (*NopMetrics) Revision(tableName string, revision uint64)

Revision implements Metrics.

func (*NopMetrics) WriteTxnDuration

func (*NopMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration)

WriteTxnDuration implements Metrics.

func (*NopMetrics) WriteTxnTableAcquisition

func (*NopMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration)

WriteTxnTableAcquisition implements Metrics.

func (*NopMetrics) WriteTxnTotalAcquisition

func (*NopMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration)

WriteTxnTotalAcquisition implements Metrics.

type Option

type Option func(*opts)

func WithMetrics

func WithMetrics(m Metrics) Option

type Query

type Query[Obj any] struct {
	// contains filtered or unexported fields
}

func ByRevision

func ByRevision[Obj any](rev uint64) Query[Obj]

ByRevision constructs a revision query. Applicable to any table.

type QueryRequest

type QueryRequest struct {
	Key        string `json:"key"` // Base64 encoded query key
	Table      string `json:"table"`
	Index      string `json:"index"`
	LowerBound bool   `json:"lowerbound"`
}

type QueryResponse

type QueryResponse struct {
	Rev uint64 `json:"rev"`
	Obj any    `json:"obj"`
	Err string `json:"err,omitempty"`
}

type RWTable

type RWTable[Obj any] interface {
	// RWTable[Obj] is a superset of Table[Obj]. Queries made with a
	// write transaction return the fresh uncommitted modifications if any.
	Table[Obj]

	// RegisterInitializer adds an initializers to the table. Returns
	// a function to mark the initializer done. Once all initializers are
	// done, Table[*].Initialized() will return true.
	// This should only be used before the application has started.
	RegisterInitializer(WriteTxn) func(WriteTxn)

	// ToTable returns the Table[Obj] interface. Useful with cell.Provide
	// to avoid the anonymous function:
	//
	//   cell.ProvidePrivate(NewMyTable), // RWTable
	//   cell.Invoke(statedb.Register[statedb.RWTable[Foo])
	//
	//   // with anononymous function:
	//   cell.Provide(func(t statedb.RWTable[Foo]) statedb.Table[Foo] { return t })
	//
	//   // with ToTable:
	//   cell.Provide(statedb.RWTable[Foo].ToTable),
	ToTable() Table[Obj]

	// Insert an object into the table. Returns the object that was
	// replaced if there was one.
	//
	// Possible errors:
	// - ErrTableNotLockedForWriting: table was not locked for writing
	// - ErrTransactionClosed: the write transaction already committed or aborted
	//
	// Each inserted or updated object will be assigned a new unique
	// revision.
	Insert(WriteTxn, Obj) (oldObj Obj, hadOld bool, err error)

	// CompareAndSwap compares the existing object's revision against the
	// given revision and if equal it replaces the object.
	//
	// Possible errors:
	// - ErrRevisionNotEqual: the object has mismatching revision
	// - ErrObjectNotFound: object not found from the table
	// - ErrTableNotLockedForWriting: table was not locked for writing
	// - ErrTransactionClosed: the write transaction already committed or aborted
	CompareAndSwap(WriteTxn, Revision, Obj) (oldObj Obj, hadOld bool, err error)

	// Delete an object from the table. Returns the object that was
	// deleted if there was one.
	//
	// If the table is being tracked for deletions via EventIterator()
	// the deleted object is inserted into a graveyard index and garbage
	// collected when all delete trackers have consumed it. Each deleted
	// object in the graveyard has unique revision allowing interleaved
	// iteration of updates and deletions.
	//
	// Possible errors:
	// - ErrTableNotLockedForWriting: table was not locked for writing
	// - ErrTransactionClosed: the write transaction already committed or aborted
	Delete(WriteTxn, Obj) (oldObj Obj, hadOld bool, err error)

	// DeleteAll removes all objects in the table. Semantically the same as
	// All() + Delete(). See Delete() for more information.
	//
	// Possible errors:
	// - ErrTableNotLockedForWriting: table was not locked for writing
	// - ErrTransactionClosed: the write transaction already committed or aborted
	DeleteAll(WriteTxn) error

	// CompareAndDelete compares the existing object's revision against the
	// given revision and if equal it deletes the object. If object is not
	// found 'hadOld' will be false and 'err' nil.
	//
	// Possible errors:
	// - ErrRevisionNotEqual: the object has mismatching revision
	// - ErrTableNotLockedForWriting: table was not locked for writing
	// - ErrTransactionClosed: the write transaction already committed or aborted
	CompareAndDelete(WriteTxn, Revision, Obj) (oldObj Obj, hadOld bool, err error)
}

RWTable provides methods for modifying the table under a write transaction that targets this table.

func MustNewTable

func MustNewTable[Obj any](
	tableName TableName,
	primaryIndexer Indexer[Obj],
	secondaryIndexers ...Indexer[Obj]) RWTable[Obj]

MustNewTable creates a new table with given name and indexes. Panics if indexes are malformed.

func NewTable

func NewTable[Obj any](
	tableName TableName,
	primaryIndexer Indexer[Obj],
	secondaryIndexers ...Indexer[Obj],
) (RWTable[Obj], error)

NewTable creates a new table with given name and indexes. Can fail if the indexes are malformed.

To provide access to the table via Hive:

cell.Provide(
	// Provide statedb.RWTable[*MyObject]. Often only provided to the module with ProvidePrivate.
	statedb.NewTable[*MyObject]("my-objects", MyObjectIDIndex, MyObjectNameIndex),
	// Provide the read-only statedb.Table[*MyObject].
	statedb.RWTable[*MyObject].ToTable,
)

type ReadTxn

type ReadTxn interface {

	// WriteJSON writes the contents of the database as JSON.
	WriteJSON(w io.Writer, tables ...string) error
	// contains filtered or unexported methods
}

type RemoteTable

type RemoteTable[Obj any] struct {
	// contains filtered or unexported fields
}

func NewRemoteTable

func NewRemoteTable[Obj any](base *url.URL, table TableName) *RemoteTable[Obj]

NewRemoteTable creates a new handle for querying a remote StateDB table over the HTTP. Example usage:

devices := statedb.NewRemoteTable[*tables.Device](url.Parse("http://localhost:8080/db"), "devices")

// Get all devices ordered by name.
iter, errs := devices.LowerBound(ctx, tables.DeviceByName(""))
for device, revision, ok := iter.Next(); ok; device, revision, ok = iter.Next() { ... }

// Get device by name.
iter, errs := devices.Get(ctx, tables.DeviceByName("eth0"))
if dev, revision, ok := iter.Next(); ok { ... }

// Get devices in revision order, e.g. oldest changed devices first.
iter, errs = devices.LowerBound(ctx, statedb.ByRevision(0))

func (*RemoteTable[Obj]) Get

func (t *RemoteTable[Obj]) Get(ctx context.Context, q Query[Obj]) (Iterator[Obj], <-chan error)

func (*RemoteTable[Obj]) LowerBound

func (t *RemoteTable[Obj]) LowerBound(ctx context.Context, q Query[Obj]) (Iterator[Obj], <-chan error)

func (*RemoteTable[Obj]) SetTransport

func (t *RemoteTable[Obj]) SetTransport(tr *http.Transport)

type Revision

type Revision = uint64

type Table

type Table[Obj any] interface {
	// TableMeta for querying table metadata that is independent of
	// 'Obj' type.
	TableMeta

	// PrimaryIndexer returns the primary indexer for the table.
	// Useful for generic utilities that need access to the primary key.
	PrimaryIndexer() Indexer[Obj]

	// NumObjects returns the number of objects stored in the table.
	NumObjects(ReadTxn) int

	// Initialized returns true if the registered table initializers have
	// completed.
	Initialized(ReadTxn) bool

	// Revision of the table. Constant for a read transaction, but
	// increments in a write transaction on each Insert and Delete.
	Revision(ReadTxn) Revision

	// All returns an iterator for all objects in the table and a watch
	// channel that is closed when the table changes.
	All(ReadTxn) (Iterator[Obj], <-chan struct{})

	// List returns an iterator for all objects matching the given query.
	List(ReadTxn, Query[Obj]) Iterator[Obj]

	// ListWatch returns an iterator for all objects matching the given query
	// and a watch channel that is closed if the query results are
	// invalidated by a write to the table.
	ListWatch(ReadTxn, Query[Obj]) (Iterator[Obj], <-chan struct{})

	// Get returns the first matching object for the query.
	Get(ReadTxn, Query[Obj]) (obj Obj, rev Revision, found bool)

	// GetWatch return the first matching object and a watch channel
	// that is closed if the query is invalidated.
	GetWatch(ReadTxn, Query[Obj]) (obj Obj, rev Revision, watch <-chan struct{}, found bool)

	// LowerBound returns an iterator for objects that have a key
	// greater or equal to the query. The returned watch channel is closed
	// when anything in the table changes as more fine-grained notifications
	// are not possible with a lower bound search.
	LowerBound(ReadTxn, Query[Obj]) (iter Iterator[Obj], watch <-chan struct{})

	// Prefix searches the table by key prefix.
	Prefix(ReadTxn, Query[Obj]) (iter Iterator[Obj], watch <-chan struct{})

	// Changes returns an iterator for changes happening to the table.
	// This uses the revision index to iterate over the objects in the order
	// they have changed. Deleted objects are placed onto a temporary index
	// (graveyard) where they live until all change iterators have observed
	// the deletion.
	//
	// If an object is created and deleted before the observer has iterated
	// over the creation then only the deletion is seen.
	Changes(WriteTxn) (ChangeIterator[Obj], error)
}

Table provides methods for querying the contents of a table.

type TableMeta

type TableMeta interface {
	Name() TableName // The name of the table
	// contains filtered or unexported methods
}

TableMeta provides information about the table that is independent of the object type (the 'Obj' constraint).

type TableName

type TableName = string

type TableWritable

type TableWritable interface {
	// TableHeader returns the header columns that are independent of the
	// object.
	TableHeader() []string

	// TableRow returns the row columns for this object.
	TableRow() []string
}

TableWritable is a constraint for objects that implement tabular pretty-printing. Used in "cilium-dbg statedb" sub-commands.

type WriteTxn

type WriteTxn interface {
	// WriteTxn is always also a ReadTxn
	ReadTxn

	// Abort the current transaction. All changes are disgarded.
	// It is safe to call Abort() after calling Commit(), e.g.
	// the following pattern is strongly encouraged to make sure
	// write transactions are always completed:
	//
	//  txn := db.WriteTxn(...)
	//  defer txn.Abort()
	//  ...
	//  txn.Commit()
	Abort()

	// Commit the changes in the current transaction to the target tables.
	// This is a no-op if Abort() or Commit() has already been called.
	// Returns a ReadTxn for reading the database at the time of commit.
	Commit() ReadTxn
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL