raftstore

package
v3.2.7+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2021 License: Apache-2.0, MIT Imports: 24 Imported by: 1

Documentation

Index

Constants

View Source
const (
	TruncateTicket             = 5 * time.Minute
	FlushTicket                = 1 * time.Second
	DefaultFlushTimeInterval   = 600 // 10 minutes
	DefaultFlushCountThreshold = 200000
)

Variables

This section is empty.

Functions

func StartRaftServer

func StartRaftServer(nodeId entity.NodeID, ip string, resolver raft.SocketResolver) (*raft.RaftServer, error)

Types

type EventListener

type EventListener interface {
	HandleRaftReplicaEvent(event *RaftReplicaEvent)
	HandleRaftLeaderEvent(event *RaftLeaderEvent)
	HandleRaftFatalEvent(event *RaftFatalEvent)
}

this interface for event , server implements it

type RaftApplyResponse

type RaftApplyResponse struct {
	FlushC chan error
	Err    error
}

func (*RaftApplyResponse) SetErr

func (r *RaftApplyResponse) SetErr(err error) *RaftApplyResponse

type RaftFatalEvent

type RaftFatalEvent struct {
	PartitionId entity.PartitionID
	Cause       error
}

type RaftLeaderEvent

type RaftLeaderEvent struct {
	PartitionId entity.PartitionID
	Leader      entity.NodeID
}

type RaftReplicaEvent

type RaftReplicaEvent struct {
	PartitionId entity.PartitionID
	Delete      bool
	Replica     *entity.Replica
}

type RaftResolver

type RaftResolver struct {
	// contains filtered or unexported fields
}

RaftResolver resolve NodeID to net.Addr addresses

func NewRaftResolver

func NewRaftResolver() *RaftResolver

NewRaftResolver create RaftResolver

func (*RaftResolver) AddNode

func (r *RaftResolver) AddNode(id entity.NodeID, replica *entity.Replica)

func (*RaftResolver) DeleteNode

func (r *RaftResolver) DeleteNode(id entity.NodeID)

func (*RaftResolver) GetNode

func (r *RaftResolver) GetNode(id entity.NodeID) *nodeRef

func (*RaftResolver) NodeAddress

func (r *RaftResolver) NodeAddress(nodeID uint64, stype raft.SocketType) (string, error)

NodeAddress resolve NodeID to net.Addr addresses.

func (*RaftResolver) ToReplica

func (r *RaftResolver) ToReplica(id entity.NodeID) (replica *entity.Replica)

type ReplicasStatusEntry

type ReplicasStatusEntry struct {
	NodeID      entity.NodeID
	PartitionID entity.PartitionID
	ReStatusMap sync.Map
}

type Store

type Store struct {
	*storage.StoreBase
	RaftPath      string
	RaftServer    *raft.RaftServer
	EventListener EventListener
	Sn            int64
	LastFlushSn   int64
	LastFlushTime time.Time
	Client        *client.Client

	RsStatusC   chan *ReplicasStatusEntry
	RsStatusMap sync.Map
	// contains filtered or unexported fields
}

func CreateStore

func CreateStore(ctx context.Context, pID entity.PartitionID, nodeID entity.NodeID, space *entity.Space, raftServer *raft.RaftServer, eventListener EventListener, client *client.Client) (*Store, error)

CreateStore create an instance of Store.

func (*Store) Apply

func (s *Store) Apply(command []byte, index uint64) (resp interface{}, err error)

Apply implements the raft interface.

func (*Store) ApplyMemberChange

func (s *Store) ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error)

ApplyMemberChange implements the raft interface.

func (*Store) ApplySnapshot

func (s *Store) ApplySnapshot(peers []proto.Peer, iter proto.SnapIterator) error

ApplySnapshot implements the raft interface.

func (*Store) ChangeMember

func (s *Store) ChangeMember(changeType proto.ConfChangeType, server *entity.Server) error

func (*Store) Close

func (s *Store) Close() error

Destroy close partition store if it running currently.

func (*Store) Destroy

func (s *Store) Destroy() (err error)

Destroy close partition store if it running currently and remove all data file from filesystem.

func (*Store) Flush

func (s *Store) Flush(ctx context.Context) error

func (*Store) GetDocument

func (s *Store) GetDocument(ctx context.Context, readLeader bool, doc *vearchpb.Document) (err error)

func (*Store) GetLeader

func (s *Store) GetLeader() (entity.NodeID, uint64)

func (*Store) GetPartition

func (s *Store) GetPartition() *entity.Partition

func (*Store) GetUnreachable

func (s *Store) GetUnreachable(id uint64) []uint64

func (*Store) HandleFatalEvent

func (s *Store) HandleFatalEvent(err *raft.FatalError)

HandleFatalEvent implements the raft interface.

func (*Store) HandleLeaderChange

func (s *Store) HandleLeaderChange(leader uint64)

HandleLeaderChange implements the raft interface.

func (*Store) IsLeader

func (s *Store) IsLeader() bool

func (*Store) RaftSubmit

func (s *Store) RaftSubmit(data []byte) (err error)

raft submit do

func (*Store) ReplicasStatusChange

func (s *Store) ReplicasStatusChange() bool

replicas status,behind leader or equal leader

func (*Store) Search

func (s *Store) Search(ctx context.Context, request *vearchpb.SearchRequest, response *vearchpb.SearchResponse) (err error)

func (*Store) Snapshot

func (s *Store) Snapshot() (proto.Snapshot, error)

Snapshot implements the raft interface.

func (*Store) Start

func (s *Store) Start() (err error)

Start start the store.

func (*Store) Status

func (s *Store) Status() *raft.Status

func (*Store) TryToLeader

func (s *Store) TryToLeader() error

func (*Store) UpdateSpace

func (s *Store) UpdateSpace(ctx context.Context, space *entity.Space) error

func (*Store) Write

func (s *Store) Write(ctx context.Context, request *vearchpb.DocCmd) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL