raft

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2021 License: MIT Imports: 16 Imported by: 0

README

Raft

Raft 共识算法基于Go实现, 能力列表:

  1. Leader Election √
  2. Log Replication √
  3. Log Recover √
  4. Log Compaction ❌
  5. Membership Management √

Raft介绍:

实现参考:

如何使用

可以直接阅读源码来学习具体的实现, 如果实在懒得看, 可以按照下面做:

package main

func TestRaft(t *testing.T) {
    //具体可参考TestMainFunctions
    //创建日志回调记录
    commit := make(chan raft.CommitEntry)
    
    //开始监听所有服务
    server := raft.NewServer(":0", commit, nil)
    server.Server()
    
    exit := make(chan interface{})
    go func() {
        for {
            select {
            case c := <-commit:
                //如果有数据提交会通知
                log.Println(c)
            case <-exit:
                return
            }
        }
    }()
    
    //提交一条日志(如果只有一个节点会提交失败)
    fmt.Println(server.Commit("TEST-LOG"))
    
    time.Sleep(time.Second)
    
    //获取所有日志
    fmt.Println(server.Logs(0))
    
    //获取所有节点IP
    fmt.Println(server.Nodes())
    
    //获取节点当前状态
    fmt.Println(server.Status())
    
    //退出
    server.Shutdown()
    close(exit)
}

测试用例可以这样做:

$ go test -v -race -run @XXXXX(具体方法名)
PASS / FAILED

或测试全部用例:

$ go test -v -race

TODO

  1. Log Compaction(日志快照与压缩)
  2. 缩容场景处理

Documentation

Overview

Raft一致性实现 选举

Raft一致性实现 日志复制

Raft一致性实现 RPC实现

Raft一致性实现 测试用例

Raft一致性实现 Raft服务器(节点管理)

Index

Constants

View Source
const MethodAppendEntries = "/raft.message.Message/AppendEntries"
View Source
const MethodForwardMessage = "/raft.message.Message/ForwardMessage"
View Source
const MethodRequestVote = "/raft.message.Message/RequestVote"

Variables

View Source
var ErrorContextCanceled = fmt.Errorf("context canceled")
View Source
var ErrorInvalidClient = fmt.Errorf("client was closed")
View Source
var ErrorServerAlreadyShutdown = fmt.Errorf("server already shutdown")

Functions

This section is empty.

Types

type BenchTest

type BenchTest struct {
	// contains filtered or unexported fields
}

BenchTest 测试用例

func NewBench

func NewBench(t *testing.T, address ...string) *BenchTest

NewBench 新建测试用例(初始化)

func (*BenchTest) CheckCommitted

func (h *BenchTest) CheckCommitted(cmd string) (nc int, index int32)

CheckCommitted 检查某条日志是否全部被提交

func (*BenchTest) CheckCommittedN

func (h *BenchTest) CheckCommittedN(cmd string, n int)

CheckCommittedN 检查是否存在N个节点的提交记录

func (*BenchTest) CheckNoLeader

func (h *BenchTest) CheckNoLeader()

CheckNoLeader 检查是否没有Leader

func (*BenchTest) CheckNotCommitted

func (h *BenchTest) CheckNotCommitted(cmd string)

CheckNotCommitted 检查某条日志是否未提交

func (*BenchTest) CheckSingleLeader

func (h *BenchTest) CheckSingleLeader() (string, int32)

CheckSingleLeader 检查是否只有一个Leader

func (*BenchTest) CrashPeer

func (h *BenchTest) CrashPeer(id string)

CrashPeer 测试崩一个节点

func (*BenchTest) DisconnectPeer

func (h *BenchTest) DisconnectPeer(id string)

DisconnectPeer 断开本地服务并且断开其他节点的该服务

func (*BenchTest) ReconnectPeer

func (h *BenchTest) ReconnectPeer(id string)

ReconnectPeer 重新连接

func (*BenchTest) RestartPeer

func (h *BenchTest) RestartPeer(id string)

RestartPeer 重启一个节点

func (*BenchTest) Shutdown

func (h *BenchTest) Shutdown()

Shutdown 停止服务

func (*BenchTest) Submit

func (h *BenchTest) Submit(cmd string) (string, bool)

Submit 测试向服务器提交日志(不转发,并且直接找到Leader)

func (*BenchTest) SubmitToServer

func (h *BenchTest) SubmitToServer(serverId string, cmd string) bool

SubmitToServer 测试向服务器提交日志

func (*BenchTest) SubmitToServerNotForward

func (h *BenchTest) SubmitToServerNotForward(serverId string, cmd string) bool

SubmitToServerNotForward 测试向服务器提交日志(禁用转发)

type CommitEntry

type CommitEntry struct {
	// Command 客户端提交的命令
	Command string

	// Index 提交的索引
	Index int32

	// Term 提交的生命周期
	Term int32
}

CommitEntry 数据提交的通道,每次数据有变动时CommitChannel会触发结果

type Config

type Config struct {
	// 选举最短超时时间
	ElectionTimeoutMinMs int
	// 选举最长超时时间
	ElectionTimeoutMaxMs int
	// Leader心跳间隔
	HeartbeatMs time.Duration
	// rpc请求超时时间
	RPCMsgTimeoutMs time.Duration
	// 是否打印日志
	ShowLog bool
	// 是否自动转发消息(如果在Follower上提交日志,会自动转发到Leader)
	AutoRedirectMessage bool
}

Config 配置

type FSM

type FSM int32

FSM 状态机

const (
	Follower FSM = iota
	Candidate
	Leader
	Shutdown
)

func (FSM) String

func (s FSM) String() string

type LogEntry added in v0.0.4

type LogEntry struct {
	// Command 日志命令
	Command string

	// Term 当前的Term
	Term int32
}

LogEntry 日志记录

type MapStorage

type MapStorage struct {
	// contains filtered or unexported fields
}

func NewMapStorage

func NewMapStorage() *MapStorage

func (*MapStorage) Get

func (ms *MapStorage) Get(key string) ([]byte, bool)

func (*MapStorage) Len

func (ms *MapStorage) Len() int

func (*MapStorage) Set

func (ms *MapStorage) Set(key string, value []byte)

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(address string, commitChan chan<- CommitEntry, config *Config, nodeIDs ...string) *Server

NewServer 新建一个raft服务 address为TCP监听的地址,一般给个端口就可以如:8888 commitChan如果有数据新增,则会通知到这里,业务可以用于监听 config配置信息,需要确保electionMin比Max要小,并且heartbeat也要比electionMin小这三个参数才可生效 可根据实际情况调整时间,默认min(150ms), max(300ms), heartbeat(20ms) nodeIDs除自己以外的节点列表(新的节点把老的节点的地址都填上即可相互连接)

func (*Server) Commit

func (s *Server) Commit(command string) bool

Commit 提交日志

func (*Server) Logs added in v0.0.4

func (s *Server) Logs(n int) []LogEntry

Logs 获取日志 n:0(全部) n>0(正数n个) n<0(倒数n个)

func (*Server) Nodes added in v0.0.4

func (s *Server) Nodes() []string

Nodes 获取所有节点

func (*Server) Server

func (s *Server) Server()

Server 开启服务

func (*Server) Shutdown

func (s *Server) Shutdown()

Shutdown 关闭服务

func (*Server) Status added in v0.0.4

func (s *Server) Status() (int32, bool, string)

Status 获取状态

type Storage

type Storage interface {
	Set(key string, value []byte)
	Get(key string) ([]byte, bool)
	Len() int
}

Directories

Path Synopsis
proto

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL