hedge

package module
v1.13.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2024 License: MIT Imports: 18 Imported by: 2

README

main Docker Repository on Quay Go Reference

hedge

A library built on top of spindle and Cloud Spanner that provides rudimentary distributed computing facilities to Kubernetes Deployments. Features include a consistent, append-only, Spanner-backed distributed key/value storage, a distributed locking/leader election mechanism through spindle, a simple member-to-leader communication channel, a broadcast (send-to-all) mechanism, and a distributed semaphore. It also works even on single-pod deployments.

Why?

In a nutshell, I wanted something much simpler than using Raft (my progress on that front is quite slow), or worse, Paxos (Raft maybe as the long-term goal, although I also have an ongoing trial using Paxos here). And I wanted an easily-accessible storage that is a bit decoupled from the code (easier to query, edit, debug, backup, etc). We are already a heavy Spanner user, and spindle has been in our production for quite a while now: these two should be able to do it, preferably on a k8s Deployment; StatefulSets or DaemonSets shouldn't be a requirement. Since then, additional features have been added, such as the Send() API.

What does it do?

Leader election is handled by spindle. Two APIs are provided for storage: Put() and Get(). All pods can serve the Get() calls, while only the leader handles the Put() APIs. If a non-leader pod calls Put(), that call is forwarded to the leader, who will do the actual write. All Put()'s are append-only.

spindle's HasLock() function is also available for distributed locking due to struct embedding, although you can use spindle separately for that, if you prefer.

A Send() API is also provided for members to be able to send simple request/reply-type messages to the current leader at any time.

A Broadcast() API is also available for all pods. Note that due to the nature of k8s deployments (pods come and go) and the internal heartbeat delays, some pods might not receive the broadcast message at call time, although all pods will have the complete broadcast target list eventually. hedge uses a combination of heartbeats and broadcasts to propagate member information to all pods; non-leaders send liveness heartbeats to the leader while the leader broadcasts active members to all pods.

Finally, a distributed semaphore is also provided through the NewSemaphore(), [Try]Acquire(), and Release() APIs.

Prerequisites

  • All pods within the group should be able to contact each other via TCP (address:port).
  • Each hedge's instance id should be set using the pod's cluster IP address:port. You can use downward API to get the pod's IP address, or you can use the ":port" format in which case the IP address will be resolved internally.
  • For now, spindle's lock table and hedge's log table are within the same database.
  • Tables for spindle and hedge need to be created beforehand. See here for spindle's DDL. For hedge, see below:
-- 'logtable' name is just an example
CREATE TABLE logtable (
    id STRING(MAX),
    key STRING(MAX),
    value STRING(MAX),
    leader STRING(MAX),
    timestamp TIMESTAMP OPTIONS (allow_commit_timestamp=true),
) PRIMARY KEY (key, id)
  • This library will use the input key/value table (logtable in the example above) for its semaphore-related operations with the following reserved keywords:
column=key, value=__hedge/semaphore/{name}
column=key, value=__caller={ip:port}
column=id, value=__hedge/semaphore/{name}
column=id, value=limit={num}

How to use

Something like:

client, _ := spanner.NewClient(context.Background(), "your/spanner/database")
defer client.Close()

op := hedge.New(
    client,
    ":8080", // addr will be resolved internally
    "locktable",
    "myspindlelock",
    "logtable",
    hedge.WithLeaderHandler( // if leader only, handles Send()
        nil,
        func(data interface{}, msg []byte) ([]byte, error) {
            log.Println("[send] received:", string(msg))
            return []byte("hello " + string(msg)), nil
        },
    ),
    hedge.WithBroadcastHandler( // handles Broadcast()
        nil,
        func(data interface{}, msg []byte) ([]byte, error) {
            log.Println("[broadcast] received:", string(msg))
            return []byte("broadcast " + string(msg)), nil
        },
    ),
})

ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1) // optional wait
go op.Run(ctx, done)

// For storage, any pod should be able to call op.Put(...) or op.Get(...) here.
//
// Any pod can call HasLock() here at any given time to know whether they are leader or not.
//   hl, _ := op.HasLock()
//   if hl {
//     log.Println("leader here!")
//   }
//
// Calling op.Send(...) will be handled by the leader through the WithLeaderHandler callback.
//
// For broadcast, any pod can call op.Broadcast(...) here which will be handled by each
//   pod's WithBroadcastHandler callback, including the caller.
//
// For distributed semaphore, any pod can call the following:
//   sem, _ := op.NewSemaphore(ctx, "semaphore-name", 2)
//   sem.Acquire(ctx)
//   ...
//   sem.Release(ctx)

cancel()
<-done

A sample deployment file for GKE is provided, although it needs a fair bit of editing (for auth) to be usable. It uses Workload Identity for authentication although you can update it to use other authentication methods as well. The service account needs to have Spanner permissions.

Once deployed, you can do the following tests while checking the logs. We will use kubepfm to port-forward our test commands to the server.

Test the Put() API:

# Open a terminal and run:
$ kubepfm --target deployment/hedgedemo:8081:8081

# Open another terminal and run:
$ curl localhost:8081/put -d "samplekey samplevalue"

# To ensure a non-leader sender, you can also specify a
# non-leader pod for the kubepfm command above:
$ kubepfm --target hedgedemo-6b5bcd4998-n95n7:8081:8081

Test the Get() API:

# While kubepfm is running on a different terminal, run:
$ curl localhost:8081/get -d "samplekey"

Test the Send() API:

# While kubepfm is running on a different terminal, run:
$ curl localhost:8081/send -d "hello-world"

Test the Broadcast() API:

# While kubepfm is running on a different terminal, run:
$ curl localhost:8081/broadcast -d "hello-all"

Documentation

Index

Constants

View Source
const (
	CmdLeader     = "LDR" // for leader confirmation, reply="ACK"
	CmdWrite      = "PUT" // write key/value, fmt="PUT <base64(payload)> [noappend]"
	CmdSend       = "SND" // member to leader, fmt="SND <base64(payload)>"
	CmdPing       = "HEY" // heartbeat to indicate availability, fmt="HEY [id]"
	CmdMembers    = "MEM" // members info from leader to all, fmt="MEM base64(JSON(members))"
	CmdBroadcast  = "ALL" // broadcast to all, fmt="ALL base64(payload)"
	CmdAck        = "ACK" // generic reply, fmt="ACK"|"ACK base64(err)"|"ACK base64(JSON(members))"
	CmdSemaphore  = "SEM" // create semaphore, fmt="SEM {name} {limit} {caller}, reply="ACK"
	CmdSemAcquire = "SEA" // acquire semaphore, fmt="SEA {name} {caller}", reply="ACK[ base64([0:|1:]err)]" (0=final,1=retry)
	CmdSemRelease = "SER" // release semaphore, fmt="SER {name} {caller}"

	FlagNoAppend = "noappend"
)

Variables

View Source
var (
	ErrNotRunning   = fmt.Errorf("hedge: not running")
	ErrNoLeader     = fmt.Errorf("hedge: no leader available")
	ErrNoHandler    = fmt.Errorf("hedge: no message handler")
	ErrNotSupported = fmt.Errorf("hedge: not supported")
	ErrInvalidConn  = fmt.Errorf("hedge: invalid connection")
)
View Source
var (
	ErrSemFull = fmt.Errorf("hedge/semaphore: semaphore full")
)

Functions

This section is empty.

Types

type BroadcastArgs added in v1.13.0

type BroadcastArgs struct {
	SkipSelf bool // if true, skip broadcasting to self
	Out      chan BroadcastOutput
}

type BroadcastOutput added in v1.3.0

type BroadcastOutput struct {
	Id    string `json:"id,omitempty"`
	Reply []byte `json:"reply,omitempty"`
	Error error  `json:"error,omitempty"`
}

type FnMsgHandler added in v1.3.0

type FnMsgHandler func(data interface{}, msg []byte) ([]byte, error)

type KeyValue

type KeyValue struct {
	Key       string    `json:"key"`
	Value     string    `json:"value"`
	Timestamp time.Time `json:"timestamp"` // read-only, populated when Get()
}

KeyValue is for Put()/Get() callers.

type LogItem

type LogItem struct {
	Id        string
	Key       string
	Value     string
	Leader    string
	Timestamp time.Time
}

LogItem represents an item in our log.

type Op added in v0.3.0

type Op struct {
	*spindle.Lock // handles our distributed lock
	// contains filtered or unexported fields
}

Op is our main instance for hedge operations.

func New

func New(client *spanner.Client, hostPort, lockTable, lockName, logTable string, opts ...Option) *Op

New creates an instance of Op. hostPort can be in "ip:port" format, or ":port" format, in which case the IP part will be resolved internally, or empty, in which case port 8080 will be used. The internal spindle object's lock table name will be lockTable, and lockName is the lock name. logTable will serve as our append-only, distributed key/value storage table. If logTable is empty, Put, Get, and Semaphore features will be disabled.

func (*Op) Broadcast added in v1.3.0

func (op *Op) Broadcast(ctx context.Context, msg []byte, args ...BroadcastArgs) []BroadcastOutput

Broadcast sends msg to all nodes (send to all). Any node can broadcast messages, including the leader itself. Note that this is best-effort basis only; by the time you call this API, the handler might not have all the active members in record yet, as is the usual situation with k8s deployments, where pods come and go, and our internal heartbeat protocol hasn't been completed yet. This call will also block until it receives all the reply from all nodes' broadcast handlers.

If args[].Out is set, the output will be streamed to that channel instead. Useful if you prefer a streamed output (as reply comes) instead of waiting for all replies before returning. If set, the return value (output slice) will be set to empty []. Also, close() will be called on the Out channel to indicate streaming end.

func (*Op) Get added in v0.3.0

func (op *Op) Get(ctx context.Context, key string, limit ...int64) ([]KeyValue, error)

Get reads a key (or keys) from Op. The values of limit are:

limit = 0  --> (default) latest only
limit = -1 --> all (latest to oldest, [0]=latest)
limit = -2 --> oldest version only
limit > 0  --> items behind latest; 3 means latest + 2 versions behind, [0]=latest

func (*Op) HostPort added in v1.7.8

func (op *Op) HostPort() string

HostPort returns the host:port (or name) of this instance.

func (*Op) IsRunning added in v1.8.0

func (op *Op) IsRunning() bool

IsRunning returns true if Op is already running.

func (*Op) Members added in v1.11.0

func (op *Op) Members() []string

Members returns a list of members in the cluster/group.

func (*Op) Name added in v1.7.8

func (op *Op) Name() string

Name is the same as HostPort.

func (*Op) NewSemaphore added in v0.3.1

func (op *Op) NewSemaphore(ctx context.Context, name string, limit int) (*Semaphore, error)

NewSemaphore returns a distributed semaphore object.

func (*Op) Put added in v0.3.0

func (op *Op) Put(ctx context.Context, kv KeyValue, po ...PutOptions) error

Put saves a key/value to Op. This call will try to block, at least roughly until spindle's timeout, to wait for the leader's availability to do actual writes before returning.

func (*Op) Run added in v0.3.0

func (op *Op) Run(ctx context.Context, done ...chan error) error

Run starts the main handler. It blocks until ctx is cancelled, optionally sending an error message to done when finished.

func (*Op) Send added in v1.2.0

func (op *Op) Send(ctx context.Context, msg []byte) ([]byte, error)

Send sends msg to the current leader. Any node can send messages, including the leader itself (send to self). It also blocks until it receives the reply from the leader's message handler.

func (*Op) String added in v0.3.0

func (op *Op) String() string

String implements the Stringer interface.

type Option added in v1.1.0

type Option interface {
	Apply(*Op)
}

func WithBroadcastHandler added in v1.3.0

func WithBroadcastHandler(d interface{}, h FnMsgHandler) Option

WithBroadcastHandler sets the node's callback function for broadcast messages from anyone in the group using the Broadcast(...) API. Any arbitrary data represented by d will be passed to the callback h every time it is called. If d is nil, the default callback data will be the *Op object itself. The handler's returning []byte will serve as reply.

A nil broadcast handler disables the internal heartbeat function.

func WithDuration added in v1.1.0

func WithDuration(v int64) Option

WithDuration sets Op's internal spindle object's lease duration. Defaults to 30s when not set. Minimum value is 2s.

func WithGroupSyncInterval added in v1.10.0

func WithGroupSyncInterval(v time.Duration) Option

WithGroupSyncInterval sets the internal interval timeout to sync membership within the group. If not set, defaults to 30s. Minimum value is 2s.

func WithLeaderHandler added in v1.2.0

func WithLeaderHandler(d interface{}, h FnMsgHandler) Option

WithLeaderHandler sets the node's callback function when it is the current leader and when members send messages to it using the Send(...) API. Any arbitrary data represented by d will be passed to the callback h every time it is called. If d is nil, the default callback data will be the *Op object itself. The handler's returning []byte will serve as reply.

Typical flow would be:

  1. Any node (including the leader) calls the Send(...) API.
  2. The current leader handles the call by reading the input.
  3. Leader will then call FnLeaderHandler, passing the arbitrary data along with the message.
  4. FnLeaderHandler will process the data as leader, then returns the reply to the calling member.

func WithLogger added in v1.1.0

func WithLogger(v *log.Logger) Option

WithLogger sets Op's logger object. Can be silenced by setting v to:

log.New(ioutil.Discard, "", 0)

type PutOptions added in v1.7.0

type PutOptions struct {
	// If true, do a direct write, no need to fwd to leader.
	DirectWrite bool

	// If true, don't do an append-write; overwrite the latest. Note that even if you set this
	// to true, if you do another Put the next time with this field set as false (default),
	// the previous write will now be gone, or will now be part of the history.
	NoAppend bool
}

type Semaphore added in v0.3.1

type Semaphore struct {
	// contains filtered or unexported fields
}

Semaphore represents a distributed semaphore object.

func (*Semaphore) Acquire added in v0.3.1

func (s *Semaphore) Acquire(ctx context.Context) error

Acquire acquires a semaphore. This call will block until the semaphore is acquired. By default, this call will basically block forever until the semaphore is acquired or until ctx expires or is cancelled.

func (*Semaphore) Release added in v1.5.0

func (s *Semaphore) Release(ctx context.Context) error

Release releases a semaphore. Although recommended to release all acquired semaphores, this is still a best-effort release as any caller could disappear/crash while holding a semaphore. To remedy this, the current leader will attempt to track all semaphore owners and remove the non-responsive ones after some delay. A downside of not calling release properly will cause other semaphore acquirers to block just a bit longer while leader does the cleanup, whereas calling release will free up space immediately allowing other semaphore acquirers to not wait that long.

func (*Semaphore) TryAcquire added in v0.3.1

func (s *Semaphore) TryAcquire(ctx context.Context) error

TryAcquire is like Acquire() but will not block until the semaphore is acquired. It will only attempt to acquire the semaphore and will return immediately on either success or failure, or until ctx expires or is cancelled.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL