Documentation ¶
Index ¶
- Constants
- Variables
- type BroadcastArgs
- type BroadcastOutput
- type FnMsgHandler
- type KeyValue
- type LogItem
- type Op
- func (op *Op) Broadcast(ctx context.Context, msg []byte, args ...BroadcastArgs) []BroadcastOutput
- func (op *Op) Get(ctx context.Context, key string, limit ...int64) ([]KeyValue, error)
- func (op *Op) HostPort() string
- func (op *Op) IsRunning() bool
- func (op *Op) Members() []string
- func (op *Op) Name() string
- func (op *Op) NewSemaphore(ctx context.Context, name string, limit int) (*Semaphore, error)
- func (op *Op) Put(ctx context.Context, kv KeyValue, po ...PutOptions) error
- func (op *Op) Run(ctx context.Context, done ...chan error) error
- func (op *Op) Send(ctx context.Context, msg []byte) ([]byte, error)
- func (op *Op) String() string
- type Option
- type PutOptions
- type Semaphore
Constants ¶
const ( CmdLeader = "LDR" // for leader confirmation, reply="ACK" CmdWrite = "PUT" // write key/value, fmt="PUT <base64(payload)> [noappend]" CmdSend = "SND" // member to leader, fmt="SND <base64(payload)>" CmdPing = "HEY" // heartbeat to indicate availability, fmt="HEY [id]" CmdMembers = "MEM" // members info from leader to all, fmt="MEM base64(JSON(members))" CmdBroadcast = "ALL" // broadcast to all, fmt="ALL base64(payload)" CmdAck = "ACK" // generic reply, fmt="ACK"|"ACK base64(err)"|"ACK base64(JSON(members))" CmdSemaphore = "SEM" // create semaphore, fmt="SEM {name} {limit} {caller}, reply="ACK" CmdSemAcquire = "SEA" // acquire semaphore, fmt="SEA {name} {caller}", reply="ACK[ base64([0:|1:]err)]" (0=final,1=retry) CmdSemRelease = "SER" // release semaphore, fmt="SER {name} {caller}" FlagNoAppend = "noappend" )
Variables ¶
var ( ErrNotRunning = fmt.Errorf("hedge: not running") ErrNoLeader = fmt.Errorf("hedge: no leader available") ErrNoHandler = fmt.Errorf("hedge: no message handler") ErrNotSupported = fmt.Errorf("hedge: not supported") ErrInvalidConn = fmt.Errorf("hedge: invalid connection") )
var (
ErrSemFull = fmt.Errorf("hedge/semaphore: semaphore full")
)
Functions ¶
This section is empty.
Types ¶
type BroadcastArgs ¶ added in v1.13.0
type BroadcastArgs struct { SkipSelf bool // if true, skip broadcasting to self Out chan BroadcastOutput }
type BroadcastOutput ¶ added in v1.3.0
type FnMsgHandler ¶ added in v1.3.0
type KeyValue ¶
type KeyValue struct { Key string `json:"key"` Value string `json:"value"` Timestamp time.Time `json:"timestamp"` // read-only, populated when Get() }
KeyValue is for Put()/Get() callers.
type Op ¶ added in v0.3.0
type Op struct { *spindle.Lock // handles our distributed lock // contains filtered or unexported fields }
Op is our main instance for hedge operations.
func New ¶
func New(client *spanner.Client, hostPort, lockTable, lockName, logTable string, opts ...Option) *Op
New creates an instance of Op. hostPort can be in "ip:port" format, or ":port" format, in which case the IP part will be resolved internally, or empty, in which case port 8080 will be used. The internal spindle object's lock table name will be lockTable, and lockName is the lock name. logTable will serve as our append-only, distributed key/value storage table. If logTable is empty, Put, Get, and Semaphore features will be disabled.
func (*Op) Broadcast ¶ added in v1.3.0
func (op *Op) Broadcast(ctx context.Context, msg []byte, args ...BroadcastArgs) []BroadcastOutput
Broadcast sends msg to all nodes (send to all). Any node can broadcast messages, including the leader itself. Note that this is best-effort basis only; by the time you call this API, the handler might not have all the active members in record yet, as is the usual situation with k8s deployments, where pods come and go, and our internal heartbeat protocol hasn't been completed yet. This call will also block until it receives all the reply from all nodes' broadcast handlers.
If args[].Out is set, the output will be streamed to that channel instead. Useful if you prefer a streamed output (as reply comes) instead of waiting for all replies before returning. If set, the return value (output slice) will be set to empty []. Also, close() will be called on the Out channel to indicate streaming end.
func (*Op) Get ¶ added in v0.3.0
Get reads a key (or keys) from Op. The values of limit are:
limit = 0 --> (default) latest only limit = -1 --> all (latest to oldest, [0]=latest) limit = -2 --> oldest version only limit > 0 --> items behind latest; 3 means latest + 2 versions behind, [0]=latest
func (*Op) NewSemaphore ¶ added in v0.3.1
NewSemaphore returns a distributed semaphore object.
func (*Op) Put ¶ added in v0.3.0
Put saves a key/value to Op. This call will try to block, at least roughly until spindle's timeout, to wait for the leader's availability to do actual writes before returning.
func (*Op) Run ¶ added in v0.3.0
Run starts the main handler. It blocks until ctx is cancelled, optionally sending an error message to done when finished.
type Option ¶ added in v1.1.0
type Option interface {
Apply(*Op)
}
func WithBroadcastHandler ¶ added in v1.3.0
func WithBroadcastHandler(d interface{}, h FnMsgHandler) Option
WithBroadcastHandler sets the node's callback function for broadcast messages from anyone in the group using the Broadcast(...) API. Any arbitrary data represented by d will be passed to the callback h every time it is called. If d is nil, the default callback data will be the *Op object itself. The handler's returning []byte will serve as reply.
A nil broadcast handler disables the internal heartbeat function.
func WithDuration ¶ added in v1.1.0
WithDuration sets Op's internal spindle object's lease duration. Defaults to 30s when not set. Minimum value is 2s.
func WithGroupSyncInterval ¶ added in v1.10.0
WithGroupSyncInterval sets the internal interval timeout to sync membership within the group. If not set, defaults to 30s. Minimum value is 2s.
func WithLeaderHandler ¶ added in v1.2.0
func WithLeaderHandler(d interface{}, h FnMsgHandler) Option
WithLeaderHandler sets the node's callback function when it is the current leader and when members send messages to it using the Send(...) API. Any arbitrary data represented by d will be passed to the callback h every time it is called. If d is nil, the default callback data will be the *Op object itself. The handler's returning []byte will serve as reply.
Typical flow would be:
- Any node (including the leader) calls the Send(...) API.
- The current leader handles the call by reading the input.
- Leader will then call FnLeaderHandler, passing the arbitrary data along with the message.
- FnLeaderHandler will process the data as leader, then returns the reply to the calling member.
func WithLogger ¶ added in v1.1.0
WithLogger sets Op's logger object. Can be silenced by setting v to:
log.New(ioutil.Discard, "", 0)
type PutOptions ¶ added in v1.7.0
type PutOptions struct { // If true, do a direct write, no need to fwd to leader. DirectWrite bool // If true, don't do an append-write; overwrite the latest. Note that even if you set this // to true, if you do another Put the next time with this field set as false (default), // the previous write will now be gone, or will now be part of the history. NoAppend bool }
type Semaphore ¶ added in v0.3.1
type Semaphore struct {
// contains filtered or unexported fields
}
Semaphore represents a distributed semaphore object.
func (*Semaphore) Acquire ¶ added in v0.3.1
Acquire acquires a semaphore. This call will block until the semaphore is acquired. By default, this call will basically block forever until the semaphore is acquired or until ctx expires or is cancelled.
func (*Semaphore) Release ¶ added in v1.5.0
Release releases a semaphore. Although recommended to release all acquired semaphores, this is still a best-effort release as any caller could disappear/crash while holding a semaphore. To remedy this, the current leader will attempt to track all semaphore owners and remove the non-responsive ones after some delay. A downside of not calling release properly will cause other semaphore acquirers to block just a bit longer while leader does the cleanup, whereas calling release will free up space immediately allowing other semaphore acquirers to not wait that long.
func (*Semaphore) TryAcquire ¶ added in v0.3.1
TryAcquire is like Acquire() but will not block until the semaphore is acquired. It will only attempt to acquire the semaphore and will return immediately on either success or failure, or until ctx expires or is cancelled.