Versions in this module Expand all Collapse all v0 v0.1.1 Dec 26, 2014 Changes in this version + const DefaultTimeoutScale + var ErrEnqueueTimeout = errors.New("timed out enqueuing operation") + var ErrKnownPeer = errors.New("peer already known") + var ErrLeader = errors.New("node is the leader") + var ErrLeadershipLost = errors.New("leadership lost while committing log") + var ErrLogNotFound = errors.New("log not found") + var ErrNotLeader = errors.New("node is not the leader") + var ErrPipelineShutdown = errors.New("append pipeline closed") + var ErrRaftShutdown = errors.New("raft is already shutdown") + var ErrTransportShutdown = errors.New("transport shutdown") + var ErrUnknownPeer = errors.New("peer is unknown") + func AddUniquePeer(peers []net.Addr, peer net.Addr) []net.Addr + func ExcludePeer(peers []net.Addr, peer net.Addr) []net.Addr + func NewInmemTransport() (*InmemAddr, *InmemTransport) + func PeerContained(peers []net.Addr, peer net.Addr) bool + func ValidateConfig(config *Config) error + type AppendEntriesRequest struct + Entries []*Log + Leader []byte + LeaderCommitIndex uint64 + PrevLogEntry uint64 + PrevLogTerm uint64 + Term uint64 + type AppendEntriesResponse struct + LastLog uint64 + Success bool + Term uint64 + type AppendFuture interface + Request func() *AppendEntriesRequest + Response func() *AppendEntriesResponse + Start func() time.Time + type AppendPipeline interface + AppendEntries func(args *AppendEntriesRequest, resp *AppendEntriesResponse) (AppendFuture, error) + Close func() error + Consumer func() <-chan AppendFuture + type ApplyFuture interface + Response func() interface{} + type Config struct + CommitTimeout time.Duration + DisableBootstrapAfterElect bool + ElectionTimeout time.Duration + EnableSingleNode bool + HeartbeatTimeout time.Duration + LeaderLeaseTimeout time.Duration + LogOutput io.Writer + MaxAppendEntries int + ShutdownOnRemove bool + SnapshotInterval time.Duration + SnapshotThreshold uint64 + TrailingLogs uint64 + func DefaultConfig() *Config + type FSM interface + Apply func(*Log) interface{} + Restore func(io.ReadCloser) error + Snapshot func() (FSMSnapshot, error) + type FSMSnapshot interface + Persist func(sink SnapshotSink) error + Release func() + type FileSnapshotSink struct + func (s *FileSnapshotSink) Cancel() error + func (s *FileSnapshotSink) Close() error + func (s *FileSnapshotSink) ID() string + func (s *FileSnapshotSink) Write(b []byte) (int, error) + type FileSnapshotStore struct + func NewFileSnapshotStore(base string, retain int, logOutput io.Writer) (*FileSnapshotStore, error) + func (f *FileSnapshotStore) Create(index, term uint64, peers []byte) (SnapshotSink, error) + func (f *FileSnapshotStore) List() ([]*SnapshotMeta, error) + func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) + func (f *FileSnapshotStore) ReapSnapshots() error + type Future interface + Error func() error + type InmemAddr struct + ID string + func NewInmemAddr() *InmemAddr + func (ia *InmemAddr) Network() string + func (ia *InmemAddr) String() string + type InmemStore struct + func NewInmemStore() *InmemStore + func (i *InmemStore) DeleteRange(min, max uint64) error + func (i *InmemStore) FirstIndex() (uint64, error) + func (i *InmemStore) Get(key []byte) ([]byte, error) + func (i *InmemStore) GetLog(index uint64, log *Log) error + func (i *InmemStore) GetUint64(key []byte) (uint64, error) + func (i *InmemStore) LastIndex() (uint64, error) + func (i *InmemStore) Set(key []byte, val []byte) error + func (i *InmemStore) SetUint64(key []byte, val uint64) error + func (i *InmemStore) StoreLog(log *Log) error + func (i *InmemStore) StoreLogs(logs []*Log) error + type InmemTransport struct + func (i *InmemTransport) AppendEntries(target net.Addr, args *AppendEntriesRequest, resp *AppendEntriesResponse) error + func (i *InmemTransport) AppendEntriesPipeline(target net.Addr) (AppendPipeline, error) + func (i *InmemTransport) Connect(peer net.Addr, trans *InmemTransport) + func (i *InmemTransport) Consumer() <-chan RPC + func (i *InmemTransport) DecodePeer(buf []byte) net.Addr + func (i *InmemTransport) Disconnect(peer net.Addr) + func (i *InmemTransport) DisconnectAll() + func (i *InmemTransport) EncodePeer(p net.Addr) []byte + func (i *InmemTransport) InstallSnapshot(target net.Addr, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, ...) error + func (i *InmemTransport) LocalAddr() net.Addr + func (i *InmemTransport) RequestVote(target net.Addr, args *RequestVoteRequest, resp *RequestVoteResponse) error + type InstallSnapshotRequest struct + LastLogIndex uint64 + LastLogTerm uint64 + Leader []byte + Peers []byte + Size int64 + Term uint64 + type InstallSnapshotResponse struct + Success bool + Term uint64 + type JSONPeers struct + func NewJSONPeers(base string, trans Transport) *JSONPeers + func (j *JSONPeers) Peers() ([]net.Addr, error) + func (j *JSONPeers) SetPeers(peers []net.Addr) error + type Log struct + Data []byte + Index uint64 + Term uint64 + Type LogType + type LogStore interface + DeleteRange func(min, max uint64) error + FirstIndex func() (uint64, error) + GetLog func(index uint64, log *Log) error + LastIndex func() (uint64, error) + StoreLog func(log *Log) error + StoreLogs func(logs []*Log) error + type LogType uint8 + const LogAddPeer + const LogBarrier + const LogCommand + const LogNoop + const LogRemovePeer + type NetworkTransport struct + TimeoutScale int + func NewNetworkTransport(stream StreamLayer, maxPool int, timeout time.Duration, logOutput io.Writer) *NetworkTransport + func NewTCPTransport(bindAddr string, advertise net.Addr, maxPool int, timeout time.Duration, ...) (*NetworkTransport, error) + func (n *NetworkTransport) AppendEntries(target net.Addr, args *AppendEntriesRequest, resp *AppendEntriesResponse) error + func (n *NetworkTransport) AppendEntriesPipeline(target net.Addr) (AppendPipeline, error) + func (n *NetworkTransport) Close() error + func (n *NetworkTransport) Consumer() <-chan RPC + func (n *NetworkTransport) DecodePeer(buf []byte) net.Addr + func (n *NetworkTransport) EncodePeer(p net.Addr) []byte + func (n *NetworkTransport) InstallSnapshot(target net.Addr, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, ...) error + func (n *NetworkTransport) IsShutdown() bool + func (n *NetworkTransport) LocalAddr() net.Addr + func (n *NetworkTransport) RequestVote(target net.Addr, args *RequestVoteRequest, resp *RequestVoteResponse) error + type PeerStore interface + Peers func() ([]net.Addr, error) + SetPeers func([]net.Addr) error + type RPC struct + Command interface{} + Reader io.Reader + RespChan chan<- RPCResponse + func (r *RPC) Respond(resp interface{}, err error) + type RPCResponse struct + Error error + Response interface{} + type Raft struct + func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, ...) (*Raft, error) + func (r *Raft) AddPeer(peer net.Addr) Future + func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture + func (r *Raft) Barrier(timeout time.Duration) Future + func (r *Raft) LastContact() time.Time + func (r *Raft) Leader() net.Addr + func (r *Raft) LeaderCh() <-chan bool + func (r *Raft) RemovePeer(peer net.Addr) Future + func (r *Raft) SetPeers(p []net.Addr) Future + func (r *Raft) Shutdown() Future + func (r *Raft) Snapshot() Future + func (r *Raft) State() RaftState + func (r *Raft) Stats() map[string]string + func (r *Raft) String() string + func (r *Raft) VerifyLeader() Future + type RaftState uint32 + const Candidate + const Follower + const Leader + const Shutdown + func (s RaftState) String() string + type RequestVoteRequest struct + Candidate []byte + LastLogIndex uint64 + LastLogTerm uint64 + Term uint64 + type RequestVoteResponse struct + Granted bool + Peers []byte + Term uint64 + type SnapshotMeta struct + ID string + Index uint64 + Peers []byte + Size int64 + Term uint64 + type SnapshotSink interface + Cancel func() error + ID func() string + type SnapshotStore interface + Create func(index, term uint64, peers []byte) (SnapshotSink, error) + List func() ([]*SnapshotMeta, error) + Open func(id string) (*SnapshotMeta, io.ReadCloser, error) + type StableStore interface + Get func(key []byte) ([]byte, error) + GetUint64 func(key []byte) (uint64, error) + Set func(key []byte, val []byte) error + SetUint64 func(key []byte, val uint64) error + type StaticPeers struct + StaticPeers []net.Addr + func (s *StaticPeers) Peers() ([]net.Addr, error) + func (s *StaticPeers) SetPeers(p []net.Addr) error + type StreamLayer interface + Dial func(address string, timeout time.Duration) (net.Conn, error) + type TCPStreamLayer struct + func (t *TCPStreamLayer) Accept() (c net.Conn, err error) + func (t *TCPStreamLayer) Addr() net.Addr + func (t *TCPStreamLayer) Close() (err error) + func (t *TCPStreamLayer) Dial(address string, timeout time.Duration) (net.Conn, error) + type Transport interface + AppendEntries func(target net.Addr, args *AppendEntriesRequest, resp *AppendEntriesResponse) error + AppendEntriesPipeline func(target net.Addr) (AppendPipeline, error) + Consumer func() <-chan RPC + DecodePeer func([]byte) net.Addr + EncodePeer func(net.Addr) []byte + InstallSnapshot func(target net.Addr, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, ...) error + LocalAddr func() net.Addr + RequestVote func(target net.Addr, args *RequestVoteRequest, resp *RequestVoteResponse) error