chunkserver

package
v0.0.0-...-fb2c0b6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2023 License: GPL-2.0, GPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Primary = iota
	Secondary
)
View Source
const (
	OK int32 = iota
	ERROR_NOT_PRIMARY
	ERROR_NOT_SECONDARY
	ERROR_READ_FAILED
	ERROR_CHUNK_ALREADY_EXISTS
	ERROR_CREATE_CHUNK_FAILED
	ERROR_APPEND_FAILED
	ERROR_REPLICATE_FAILED
	// ERROR_APPEND_NOT_EXISTS represents the chunk to be appended does not exist on local filesystem
	ERROR_APPEND_NOT_EXISTS
	ERROR_REPLICATE_NOT_EXISTS
	ERROR_SHOULD_NOT_HAPPEN
	ERROR_CHUNK_NOT_EXISTS
	ERROR_VERSIONS_DO_NOT_MATCH
)

Variables

Functions

func CreateFile

func CreateFile(path string) error

CreateFile creates an empty file on disk and creates all intermediate directories in path

func ErrorCodeToString

func ErrorCodeToString(e int32) string

func ForwardCreateReq

func ForwardCreateReq(serverName string, req *pb.CreateChunkReq, peer string) error

ForwardCreateReq establishes a grpc.ClientConn with peer and forwards the pb.CreateChunkReq

func GetAddr

func GetAddr(host string, port uint32) string

func IsClose

func IsClose[T any](ch <-chan T) bool

func LoadChunk

func LoadChunk(path string, used uint32, start uint32, end uint32) ([]byte, error)

LoadChunk reads a file at the specified path with an offset start and ends the read at end if end equals to 0, LoadChunk reads and returns the whole data starting from start, otherwise it reads and returns (end - start) bytes

func NewAppendDataResp

func NewAppendDataResp(errorCode int32) *pb.AppendDataResp

func NewCreateChunkResp

func NewCreateChunkResp(errorCode int32) *pb.CreateChunkResp

func NewDeleteChunkResp

func NewDeleteChunkResp(errorCode int32) *pb.DeleteChunkResp

func NewForwardCreateResp

func NewForwardCreateResp(errorCode int32) *pb.ForwardCreateResp

func NewGetVersionResp

func NewGetVersionResp(errorCode int32, version *uint32, fileData []byte) *pb.GetVersionResp

func NewPeerConn

func NewPeerConn(address string) (*grpc.ClientConn, error)

NewPeerConn establishes and returns a grpc.ClientConn to the specified address

func NewReadResp

func NewReadResp(fileData []byte, errorCode int32, version *uint32) *pb.ReadResp

NewReadResp returns a pointer to pb.ReadResp that represents the result of a read with pb.Status

func NewReadVersionResp

func NewReadVersionResp(errorCode int32, version *uint32) *pb.ReadVersionResp

func NewReplicateReq

func NewReplicateReq(req *pb.ReplicateReq, peer string) error

func NewReplicateResp

func NewReplicateResp(errorCode int32, uuid string) *pb.ReplicateResp

func NewStatus

func NewStatus(errorCode int32) *pb.Status

func OverWriteChunk

func OverWriteChunk(chunkMeta *ChunkMetaData, content []byte) error

OverWriteChunk overwrites existing file on disk with content

func ReplicateRespToAppendResp

func ReplicateRespToAppendResp(replicateResp *pb.ReplicateResp) *pb.AppendDataResp

func Sum

func Sum[T Number](slice []T) T

func WriteFile

func WriteFile(chunkMeta *ChunkMetaData, content []byte) error

WriteFile opens chunk file in append mode and appends content to the file

Types

type ChunkMetaData

type ChunkMetaData struct {
	// file location in local file system
	ChunkLocation string

	// role of current chunkserver for this chunk
	Role uint32

	// IP address of primary chunk server for this chunk
	PrimaryChunkServer string
	PeerAddress        []string

	// Already used size in bytes
	Used uint32

	// Add version number
	Version uint32

	MetaDataLock sync.Mutex

	GetVersionChannel chan string
}

type ChunkServer

type ChunkServer struct {
	pb.UnimplementedChunkServerServer

	// a mapping from ChunkHandle(string) to ChunkMetaData
	Chunks map[string]*ChunkMetaData

	// a mapping from client token to client last sequence number
	ClientLastResp map[string]RespMetaData

	// globally unique server name
	ServerName string

	// base directory to store chunk files
	BasePath string

	HostName string

	Port uint32

	MasterIP string

	MasterPort uint32

	Debug     bool
	DebugChan chan DebugInfo
}

func (*ChunkServer) AppendData

func (s *ChunkServer) AppendData(ctx context.Context, appendReq *pb.AppendDataReq) (*pb.AppendDataResp, error)

func (*ChunkServer) AssignNewPrimary

func (s *ChunkServer) AssignNewPrimary(ctx context.Context, req *pb.AssignNewPrimaryReq) (res *pb.AssignNewPrimaryResp, err error)

AssignNewPrimary role to certain chunkhandle change the primary.

func (*ChunkServer) ChangeToPrimary

func (s *ChunkServer) ChangeToPrimary(ctx context.Context, req *pb.ChangeToPrimaryReq) (res *pb.ChangeToPrimaryResp, err error)

ChangeToPrimary will receive by backup chunk server, to notice it be the new Primary with certain chunk handle.

func (*ChunkServer) CreateChunk

func (s *ChunkServer) CreateChunk(ctx context.Context, createChunkReq *pb.CreateChunkReq) (*pb.CreateChunkResp, error)

CreateChunk creates file on local filesystem that represents a chunk per Master Server's request

func (*ChunkServer) DeleteChunk

func (s *ChunkServer) DeleteChunk(ctx context.Context, deleteReq *pb.DeleteChunkReq) (*pb.DeleteChunkResp, error)

DeleteChunk deletes the chunk metadata that corresponds with a string chunk handle on the primary chunk server as well as on backup servers

func (*ChunkServer) ForwardCreate

func (s *ChunkServer) ForwardCreate(ctx context.Context, forwardCreateReq *pb.ForwardCreateReq) (*pb.ForwardCreateResp, error)

ForwardCreate create new chunk as backup

func (*ChunkServer) GetVersion

func (s *ChunkServer) GetVersion(ctx context.Context, req *pb.GetVersionReq) (res *pb.GetVersionResp, err error)

func (*ChunkServer) Read

func (s *ChunkServer) Read(ctx context.Context, readReq *pb.ReadReq) (*pb.ReadResp, error)

Read handles read request from client

func (*ChunkServer) ReadVersion

func (s *ChunkServer) ReadVersion(ctx context.Context, readVersion *pb.ReadVersionReq) (*pb.ReadVersionResp, error)

ReadVersion returns the version of a chunk that corresponds with a string chunk handle and returns error if the chunk is not recorded by the ChunkServer

func (*ChunkServer) Replicate

func (s *ChunkServer) Replicate(ctx context.Context, replicateReq *pb.ReplicateReq) (*pb.ReplicateResp, error)

func (*ChunkServer) SendGetVersion

func (s *ChunkServer) SendGetVersion(chunkHandle string)

func (*ChunkServer) SendHeartBeat

func (s *ChunkServer) SendHeartBeat()

func (*ChunkServer) SendRegister

func (s *ChunkServer) SendRegister() error

func (*ChunkServer) UpdateBackup

func (s *ChunkServer) UpdateBackup(ctx context.Context, req *pb.UpdateBackupReq) (*pb.UpdateBackupResp, error)

type DebugInfo

type DebugInfo struct {
	Addr       string
	Func       string
	StatusCode int32
}

type GetVersionTimer

type GetVersionTimer struct {
	Srv         *ChunkServer
	ChunkHandle string
	Timeout     int
	Quit        <-chan string
}

func (*GetVersionTimer) Trigger

func (t *GetVersionTimer) Trigger()

type HeartBeatTimer

type HeartBeatTimer struct {
	Srv *ChunkServer
	// Timeout in millisecond
	Timeout int
}

func (*HeartBeatTimer) Trigger

func (t *HeartBeatTimer) Trigger()

type Number

type Number interface {
	constraints.Integer | constraints.Float
}

type RespMetaData

type RespMetaData struct {
	// client last seq
	LastID string

	// last response to client append request
	AppendResp *pb.AppendDataResp

	// error
	Err error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL