topology

package
v0.0.0-...-d8c34b0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2019 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AllocateVolume

func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error

func ReplicatedDelete

func ReplicatedDelete(masterNode string, store *storage.Store,
	volumeId needle.VolumeId, n *needle.Needle,
	r *http.Request) (uint32, error)

func ReplicatedWrite

func ReplicatedWrite(masterNode string, s *storage.Store,
	volumeId needle.VolumeId, n *needle.Needle,
	r *http.Request) (size uint32, isUnchanged bool, err error)

Types

type AllocateVolumeResult

type AllocateVolumeResult struct {
	Error string
}

type Collection

type Collection struct {
	Name string
	// contains filtered or unexported fields
}

func NewCollection

func NewCollection(name string, volumeSizeLimit uint64) *Collection

func (*Collection) GetOrCreateVolumeLayout

func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout

func (*Collection) ListVolumeServers

func (c *Collection) ListVolumeServers() (nodes []*DataNode)

func (*Collection) Lookup

func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode

func (*Collection) String

func (c *Collection) String() string

type Configuration

type Configuration struct {
	XMLName xml.Name `xml:"Configuration"`
	Topo    topology `xml:"Topology"`
	// contains filtered or unexported fields
}

func (*Configuration) Locate

func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string)

func (*Configuration) String

func (c *Configuration) String() string

type DataCenter

type DataCenter struct {
	NodeImpl
}

func NewDataCenter

func NewDataCenter(id string) *DataCenter

func (*DataCenter) GetOrCreateRack

func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack

func (*DataCenter) ToDataCenterInfo

func (dc *DataCenter) ToDataCenterInfo() *master_pb.DataCenterInfo

func (*DataCenter) ToMap

func (dc *DataCenter) ToMap() interface{}

type DataNode

type DataNode struct {
	NodeImpl

	Ip        string
	Port      int
	PublicUrl string
	LastSeen  int64 // unix time in seconds
	// contains filtered or unexported fields
}

func NewDataNode

func NewDataNode(id string) *DataNode

func (*DataNode) AddOrUpdateEcShard

func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo)

func (*DataNode) AddOrUpdateVolume

func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool)

func (*DataNode) DeleteEcShard

func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo)

func (*DataNode) DeltaUpdateEcShards

func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo)

func (*DataNode) DeltaUpdateVolumes

func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo)

func (*DataNode) GetDataCenter

func (dn *DataNode) GetDataCenter() *DataCenter

func (*DataNode) GetEcShards

func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo)

func (*DataNode) GetRack

func (dn *DataNode) GetRack() *Rack

func (*DataNode) GetTopology

func (dn *DataNode) GetTopology() *Topology

func (*DataNode) GetVolumes

func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo)

func (*DataNode) GetVolumesById

func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error)

func (*DataNode) HasVolumesById

func (dn *DataNode) HasVolumesById(id needle.VolumeId) (hasVolumeId bool)

func (*DataNode) MatchLocation

func (dn *DataNode) MatchLocation(ip string, port int) bool

func (*DataNode) String

func (dn *DataNode) String() string

func (*DataNode) ToDataNodeInfo

func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo

func (*DataNode) ToMap

func (dn *DataNode) ToMap() interface{}

func (*DataNode) UpdateEcShards

func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo)

func (*DataNode) UpdateVolumes

func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo)

func (*DataNode) Url

func (dn *DataNode) Url() string

type DistributedOperationResult

type DistributedOperationResult map[string]error

func (DistributedOperationResult) Error

func (dr DistributedOperationResult) Error() error

type EcShardLocations

type EcShardLocations struct {
	Collection string
	Locations  [erasure_coding.TotalShardsCount][]*DataNode
}

func NewEcShardLocations

func NewEcShardLocations(collection string) *EcShardLocations

func (*EcShardLocations) AddShard

func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool)

func (*EcShardLocations) DeleteShard

func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool)

type MaxVolumeIdCommand

type MaxVolumeIdCommand struct {
	MaxVolumeId needle.VolumeId `json:"maxVolumeId"`
}

func NewMaxVolumeIdCommand

func NewMaxVolumeIdCommand(value needle.VolumeId) *MaxVolumeIdCommand

func (*MaxVolumeIdCommand) Apply

func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error)

func (*MaxVolumeIdCommand) CommandName

func (c *MaxVolumeIdCommand) CommandName() string

type Node

type Node interface {
	Id() NodeId
	String() string
	FreeSpace() int64
	ReserveOneVolume(r int64) (*DataNode, error)
	UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
	UpAdjustVolumeCountDelta(volumeCountDelta int64)
	UpAdjustEcShardCountDelta(ecShardCountDelta int64)
	UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
	UpAdjustMaxVolumeId(vid needle.VolumeId)

	GetVolumeCount() int64
	GetEcShardCount() int64
	GetActiveVolumeCount() int64
	GetMaxVolumeCount() int64
	GetMaxVolumeId() needle.VolumeId
	SetParent(Node)
	LinkChildNode(node Node)
	UnlinkChildNode(nodeId NodeId)
	CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)

	IsDataNode() bool
	IsRack() bool
	IsDataCenter() bool
	Children() []Node
	Parent() Node

	GetValue() interface{} //get reference to the topology,dc,rack,datanode
}

type NodeId

type NodeId string

type NodeImpl

type NodeImpl struct {
	sync.RWMutex // lock children
	// contains filtered or unexported fields
}

func (*NodeImpl) Children

func (n *NodeImpl) Children() (ret []Node)

func (*NodeImpl) CollectDeadNodeAndFullVolumes

func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)

func (*NodeImpl) FreeSpace

func (n *NodeImpl) FreeSpace() int64

func (*NodeImpl) GetActiveVolumeCount

func (n *NodeImpl) GetActiveVolumeCount() int64

func (*NodeImpl) GetEcShardCount

func (n *NodeImpl) GetEcShardCount() int64

func (*NodeImpl) GetMaxVolumeCount

func (n *NodeImpl) GetMaxVolumeCount() int64

func (*NodeImpl) GetMaxVolumeId

func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId

func (*NodeImpl) GetTopology

func (n *NodeImpl) GetTopology() *Topology

func (*NodeImpl) GetValue

func (n *NodeImpl) GetValue() interface{}

func (*NodeImpl) GetVolumeCount

func (n *NodeImpl) GetVolumeCount() int64

func (*NodeImpl) Id

func (n *NodeImpl) Id() NodeId

func (*NodeImpl) IsDataCenter

func (n *NodeImpl) IsDataCenter() bool

func (*NodeImpl) IsDataNode

func (n *NodeImpl) IsDataNode() bool

func (*NodeImpl) IsRack

func (n *NodeImpl) IsRack() bool

func (*NodeImpl) LinkChildNode

func (n *NodeImpl) LinkChildNode(node Node)

func (*NodeImpl) Parent

func (n *NodeImpl) Parent() Node

func (*NodeImpl) RandomlyPickNodes

func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error)

the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot

func (*NodeImpl) ReserveOneVolume

func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error)

func (*NodeImpl) SetParent

func (n *NodeImpl) SetParent(node Node)

func (*NodeImpl) String

func (n *NodeImpl) String() string

func (*NodeImpl) UnlinkChildNode

func (n *NodeImpl) UnlinkChildNode(nodeId NodeId)

func (*NodeImpl) UpAdjustActiveVolumeCountDelta

func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)

func (*NodeImpl) UpAdjustEcShardCountDelta

func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64)

func (*NodeImpl) UpAdjustMaxVolumeCountDelta

func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)

func (*NodeImpl) UpAdjustMaxVolumeId

func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId)

func (*NodeImpl) UpAdjustVolumeCountDelta

func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64)

type Rack

type Rack struct {
	NodeImpl
}

func NewRack

func NewRack(id string) *Rack

func (*Rack) FindDataNode

func (r *Rack) FindDataNode(ip string, port int) *DataNode

func (*Rack) GetOrCreateDataNode

func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64) *DataNode

func (*Rack) ToMap

func (r *Rack) ToMap() interface{}

func (*Rack) ToRackInfo

func (r *Rack) ToRackInfo() *master_pb.RackInfo

type RemoteResult

type RemoteResult struct {
	Host  string
	Error error
}

type Topology

type Topology struct {
	NodeImpl

	Sequence sequence.Sequencer

	Configuration *Configuration

	RaftServer raft.Server
	// contains filtered or unexported fields
}

func NewTopology

func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology

func (*Topology) DeleteCollection

func (t *Topology) DeleteCollection(collectionName string)

func (*Topology) DeleteEcCollection

func (t *Topology) DeleteEcCollection(collection string)

func (*Topology) FindCollection

func (t *Topology) FindCollection(collectionName string) (*Collection, bool)

func (*Topology) GetOrCreateDataCenter

func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter

func (*Topology) GetVolumeLayout

func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout

func (*Topology) HasWritableVolume

func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool

func (*Topology) IncrementalSyncDataNodeEcShards

func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode)

func (*Topology) IncrementalSyncDataNodeRegistration

func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolumes []*master_pb.VolumeShortInformationMessage, dn *DataNode)

func (*Topology) IsLeader

func (t *Topology) IsLeader() bool

func (*Topology) Leader

func (t *Topology) Leader() (string, error)

func (*Topology) ListCollections

func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) (ret []string)

func (*Topology) ListEcServersByCollection

func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []string)

func (*Topology) Lookup

func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode)

func (*Topology) LookupEcShards

func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocations, found bool)

func (*Topology) NextVolumeId

func (t *Topology) NextVolumeId() (needle.VolumeId, error)

func (*Topology) PickForWrite

func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error)

func (*Topology) RegisterEcShards

func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode)

func (*Topology) RegisterVolumeLayout

func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode)

func (*Topology) SetVolumeCapacityFull

func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool

func (*Topology) StartRefreshWritableVolumes

func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64)

func (*Topology) SyncDataNodeEcShards

func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo)

func (*Topology) SyncDataNodeRegistration

func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo)

func (*Topology) ToMap

func (t *Topology) ToMap() interface{}

func (*Topology) ToTopologyInfo

func (t *Topology) ToTopologyInfo() *master_pb.TopologyInfo

func (*Topology) ToVolumeLocations

func (t *Topology) ToVolumeLocations() (volumeLocations []*master_pb.VolumeLocation)

func (*Topology) ToVolumeMap

func (t *Topology) ToVolumeMap() interface{}

func (*Topology) UnRegisterDataNode

func (t *Topology) UnRegisterDataNode(dn *DataNode)

func (*Topology) UnRegisterEcShards

func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode)

func (*Topology) UnRegisterVolumeLayout

func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode)

func (*Topology) Vacuum

func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) int

type VolumeGrowOption

type VolumeGrowOption struct {
	Collection         string
	ReplicaPlacement   *storage.ReplicaPlacement
	Ttl                *needle.TTL
	Prealloacte        int64
	DataCenter         string
	Rack               string
	DataNode           string
	MemoryMapMaxSizeMB uint32
}

func (*VolumeGrowOption) String

func (o *VolumeGrowOption) String() string

type VolumeGrowth

type VolumeGrowth struct {
	// contains filtered or unexported fields
}

func NewDefaultVolumeGrowth

func NewDefaultVolumeGrowth() *VolumeGrowth

func (*VolumeGrowth) AutomaticGrowByType

func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology) (count int, err error)

func (*VolumeGrowth) GrowByCountAndType

func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error)

type VolumeLayout

type VolumeLayout struct {
	// contains filtered or unexported fields
}

mapping from volume to its locations, inverted from server to volume

func NewVolumeLayout

func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout

func (*VolumeLayout) GetActiveVolumeCount

func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int

func (*VolumeLayout) ListVolumeServers

func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode)

func (*VolumeLayout) Lookup

func (vl *VolumeLayout) Lookup(vid needle.VolumeId) []*DataNode

func (*VolumeLayout) PickForWrite

func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*needle.VolumeId, uint64, *VolumeLocationList, error)

func (*VolumeLayout) RegisterVolume

func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode)

func (*VolumeLayout) SetVolumeAvailable

func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bool

func (*VolumeLayout) SetVolumeCapacityFull

func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool

func (*VolumeLayout) SetVolumeUnavailable

func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) bool

func (*VolumeLayout) Stats

func (vl *VolumeLayout) Stats() *VolumeLayoutStats

func (*VolumeLayout) String

func (vl *VolumeLayout) String() string

func (*VolumeLayout) ToMap

func (vl *VolumeLayout) ToMap() map[string]interface{}

func (*VolumeLayout) UnRegisterVolume

func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode)

type VolumeLayoutStats

type VolumeLayoutStats struct {
	TotalSize uint64
	UsedSize  uint64
	FileCount uint64
}

type VolumeLocationList

type VolumeLocationList struct {
	// contains filtered or unexported fields
}

func NewVolumeLocationList

func NewVolumeLocationList() *VolumeLocationList

func (*VolumeLocationList) Head

func (dnll *VolumeLocationList) Head() *DataNode

func (*VolumeLocationList) Length

func (dnll *VolumeLocationList) Length() int

func (*VolumeLocationList) Refresh

func (dnll *VolumeLocationList) Refresh(freshThreshHold int64)

func (*VolumeLocationList) Remove

func (dnll *VolumeLocationList) Remove(loc *DataNode) bool

func (*VolumeLocationList) Set

func (dnll *VolumeLocationList) Set(loc *DataNode)

func (*VolumeLocationList) Stats

func (dnll *VolumeLocationList) Stats(vid needle.VolumeId, freshThreshHold int64) (size uint64, fileCount int)

func (*VolumeLocationList) String

func (dnll *VolumeLocationList) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL