store

package
v1.25.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2024 License: BSD-3-Clause Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotLeader is returned when an operation can't be completed on a
	// follower or candidate node.
	ErrNotLeader      = errors.New("node is not the leader")
	ErrLeaderNotFound = errors.New("leader not found")
	ErrNotOpen        = errors.New("store not open")
	ErrUnknownCommand = errors.New("unknown command")
	// ErrDeadlineExceeded represents an error returned when the deadline for waiting for a specific update is exceeded.
	ErrDeadlineExceeded = errors.New("deadline exceeded for waiting for update")
)

Functions

func LegacySnapshot

func LegacySnapshot(nodeID string, m map[string]ClassState) (*raft.SnapshotMeta, io.ReadCloser, error)

LegacySnapshot returns a ready-to-use in-memory Raft snapshot based on the provided legacy schema

func MergeProps

func MergeProps(old, new []*models.Property) []*models.Property

MergeProps makes sure duplicates are not created by ignoring new props with the same names as old props. If property of nested type is present in both new and old slices, final property is created by merging new property into copy of old one

func NewHCLogrusLogger

func NewHCLogrusLogger(name string, logger *logrus.Logger) hclog.Logger

func NewSchema

func NewSchema(nodeID string, shardReader shardReader) *schema

Types

type Bootstrapper

type Bootstrapper struct {
	// contains filtered or unexported fields
}

Bootstrapper is used to bootstrap this node by attempting to join it to a RAFT cluster.

func NewBootstrapper

func NewBootstrapper(joiner joiner, raftID, raftAddr string, r addressResolver, isStoreReady func() bool) *Bootstrapper

NewBootstrapper constructs a new bootsrapper

func (*Bootstrapper) Do

func (b *Bootstrapper) Do(ctx context.Context, serverPortMap map[string]int, lg *logrus.Logger, voter bool, close chan struct{}) error

Do iterates over a list of servers in an attempt to join this node to a cluster.

type ClassInfo

type ClassInfo struct {
	Exists            bool
	MultiTenancy      models.MultiTenancyConfig
	ReplicationFactor int
	Tenants           int
	Properties        int
	ClassVersion      uint64
	ShardVersion      uint64
}

func (*ClassInfo) Version

func (ci *ClassInfo) Version() uint64

type ClassState

type ClassState struct {
	Class  models.Class
	Shards sharding.State
}

type Config

type Config struct {
	WorkDir               string // raft working directory
	NodeID                string
	Host                  string
	RaftPort              int
	RPCPort               int
	RaftRPCMessageMaxSize int

	// ServerName2PortMap maps server names to port numbers
	ServerName2PortMap map[string]int
	BootstrapExpect    int

	HeartbeatTimeout time.Duration
	ElectionTimeout  time.Duration
	RecoveryTimeout  time.Duration
	SnapshotInterval time.Duration
	BootstrapTimeout time.Duration
	// ConsistencyWaitTimeout is the duration we will wait for a schema version to land on that node
	ConsistencyWaitTimeout time.Duration
	SnapshotThreshold      uint64

	DB            Indexer
	Parser        Parser
	AddrResolver  addressResolver
	Logger        *logrus.Logger
	LogLevel      string
	LogJSONFormat bool
	Voter         bool

	// MetadataOnlyVoters  configures the voters to store metadata exclusively, without storing any other data
	MetadataOnlyVoters bool

	// LoadLegacySchema is responsible for loading old schema from boltDB
	LoadLegacySchema LoadLegacySchema
	// SaveLegacySchema is responsible for loading new schema into boltDB
	SaveLegacySchema SaveLegacySchema
	// IsLocalHost only required when running Weaviate from the console in localhost
	IsLocalHost bool
}

type Indexer

type Indexer interface {
	AddClass(api.AddClassRequest) error
	UpdateClass(api.UpdateClassRequest) error
	DeleteClass(string) error
	AddProperty(class string, req api.AddPropertyRequest) error
	AddTenants(class string, req *api.AddTenantsRequest) error
	UpdateTenants(class string, req *api.UpdateTenantsRequest) error
	DeleteTenants(class string, req *api.DeleteTenantsRequest) error
	UpdateShardStatus(*api.UpdateShardStatusRequest) error
	GetShardsStatus(class, tenant string) (models.ShardStatusList, error)
	UpdateIndex(api.UpdateClassRequest) error

	TriggerSchemaUpdateCallbacks()

	// ReloadLocalDB reloads the local database using the latest schema.
	ReloadLocalDB(ctx context.Context, all []api.UpdateClassRequest) error

	// RestoreClassDir restores classes on the filesystem directly from the temporary class backup stored on disk.
	RestoreClassDir(class string) error
	Open(context.Context) error
	Close(context.Context) error
}

Indexer interface updates both the collection and its indices in the filesystem. This is distinct from updating metadata, which is handled through a different interface.

type LoadLegacySchema

type LoadLegacySchema func() (map[string]ClassState, error)

type Parser

type Parser interface {
	// ParseClassUpdate parses a class after unmarshaling by setting concrete types for the fields
	ParseClass(class *models.Class) error

	// ParseClass parses new updates by providing the current class data.
	ParseClassUpdate(class, update *models.Class) (*models.Class, error)
}

Parser parses concrete class fields after deserialization

type Response

type Response struct {
	Error   error
	Version uint64
	Data    interface{}
}

type SaveLegacySchema

type SaveLegacySchema func(map[string]ClassState) error

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service abstracts away the Raft store, providing clients with an interface that encompasses all write operations. It ensures that these operations are executed on the current leader, regardless of the specific leader in the cluster.

func NewService

func NewService(store *Store, client client) *Service

func (*Service) AddClass

func (s *Service) AddClass(cls *models.Class, ss *sharding.State) (uint64, error)

func (*Service) AddProperty

func (s *Service) AddProperty(class string, props ...*models.Property) (uint64, error)

func (*Service) AddTenants

func (s *Service) AddTenants(class string, req *cmd.AddTenantsRequest) (uint64, error)

func (*Service) Close

func (s *Service) Close(ctx context.Context) (err error)

func (*Service) DeleteClass

func (s *Service) DeleteClass(name string) (uint64, error)

func (*Service) DeleteTenants

func (s *Service) DeleteTenants(class string, req *cmd.DeleteTenantsRequest) (uint64, error)

func (*Service) Execute

func (s *Service) Execute(req *cmd.ApplyRequest) (uint64, error)

func (*Service) Join

func (s *Service) Join(ctx context.Context, id, addr string, voter bool) error

func (*Service) LeaderWithID

func (s *Service) LeaderWithID() (string, string)

LeaderWithID is used to return the current leader address and ID of the cluster. It may return empty strings if there is no current leader or the leader is unknown.

func (*Service) Open

func (s *Service) Open(ctx context.Context, db Indexer) error

Open opens this store service and marked as such. It constructs a new Raft node using the provided configuration. If there is any old state, such as snapshots, logs, peers, etc., all of those will be restored

func (*Service) Query

func (s *Service) Query(ctx context.Context, req *cmd.QueryRequest) (*cmd.QueryResponse, error)

Query receives a QueryRequest and ensure it is executed on the leader and returns the related QueryResponse If any error happens it returns it

func (*Service) QueryReadOnlyClasses

func (s *Service) QueryReadOnlyClasses(classes ...string) (map[string]versioned.Class, error)

QueryReadOnlyClass will verify that class is non empty and then build a Query that will be directed to the leader to ensure we will read the class with strong consistency

func (*Service) QuerySchema

func (s *Service) QuerySchema() (models.Schema, error)

QuerySchema build a Query to read the schema that will be directed to the leader to ensure we will read the class with strong consistency

func (*Service) QueryShardOwner

func (s *Service) QueryShardOwner(class, shard string) (string, uint64, error)

QueryShardOwner build a Query to read the tenants of a given class that will be directed to the leader to ensure we will read the tenant with strong consistency and return the shard owner node

func (*Service) QueryShardingState

func (s *Service) QueryShardingState(class string) (*sharding.State, uint64, error)

QueryShardingState build a Query to read the sharding state of a given class. The request will be directed to the leader to ensure we will read the shard state with strong consistency and return the state and it's version.

func (*Service) QueryTenants

func (s *Service) QueryTenants(class string, tenants []string) ([]*models.Tenant, uint64, error)

QueryTenants build a Query to read the tenants of a given class that will be directed to the leader to ensure we will read the class with strong consistency

func (*Service) QueryTenantsShards

func (s *Service) QueryTenantsShards(class string, tenants ...string) (map[string]string, uint64, error)

QueryTenantsShards build a Query to read the tenants and their activity status of a given class. The request will be directed to the leader to ensure we will read the tenant with strong consistency and return the shard owner node

func (*Service) Ready

func (s *Service) Ready() bool

func (*Service) Remove

func (s *Service) Remove(ctx context.Context, id string) error

func (*Service) RestoreClass

func (s *Service) RestoreClass(cls *models.Class, ss *sharding.State) (uint64, error)

func (*Service) SchemaReader

func (s *Service) SchemaReader() retrySchema

func (*Service) Stats

func (s *Service) Stats() map[string]any

func (*Service) StoreSchemaV1

func (s *Service) StoreSchemaV1() error

func (*Service) UpdateClass

func (s *Service) UpdateClass(cls *models.Class, ss *sharding.State) (uint64, error)

func (*Service) UpdateShardStatus

func (s *Service) UpdateShardStatus(class, shard, status string) (uint64, error)

func (*Service) UpdateTenants

func (s *Service) UpdateTenants(class string, req *cmd.UpdateTenantsRequest) (uint64, error)

func (*Service) WaitUntilDBRestored

func (s *Service) WaitUntilDBRestored(ctx context.Context, period time.Duration, close chan struct{}) error

type Store

type Store struct {
	// contains filtered or unexported fields
}

func New

func New(cfg Config) Store

func (*Store) Apply

func (st *Store) Apply(l *raft.Log) interface{}

Apply is called once a log entry is committed by a majority of the cluster. Apply should apply the log to the FSM. Apply must be deterministic and produce the same result on all peers in the cluster. The returned value is returned to the client as the ApplyFuture.Response.

func (*Store) Close

func (st *Store) Close(ctx context.Context) error

func (*Store) Execute

func (st *Store) Execute(req *api.ApplyRequest) (uint64, error)

func (*Store) FindSimilarClass

func (f *Store) FindSimilarClass(name string) string

FindSimilarClass returns the name of an existing class with a similar name, and "" otherwise

func (*Store) ID

func (st *Store) ID() string

func (*Store) IsLeader

func (st *Store) IsLeader() bool

IsLeader returns whether this node is the leader of the cluster

func (*Store) IsVoter

func (st *Store) IsVoter() bool

func (*Store) Join

func (st *Store) Join(id, addr string, voter bool) error

Join adds the given peer to the cluster. This operation must be executed on the leader, otherwise, it will fail with ErrNotLeader. If the cluster has not been opened yet, it will return ErrNotOpen.

func (*Store) Leader

func (st *Store) Leader() string

Leader is used to return the current leader address. It may return empty strings if there is no current leader or the leader is unknown.

func (*Store) LeaderWithID

func (st *Store) LeaderWithID() (raft.ServerAddress, raft.ServerID)

func (*Store) Notify

func (st *Store) Notify(id, addr string) (err error)

Notify signals this Store that a node is ready for bootstrapping at the specified address. Bootstrapping will be initiated once the number of known nodes reaches the expected level, which includes this node.

func (*Store) Open

func (st *Store) Open(ctx context.Context) (err error)

Open opens this store and marked as such. It constructs a new Raft node using the provided configuration. If there is any old state, such as snapshots, logs, peers, etc., all of those will be restored.

func (*Store) Query

func (st *Store) Query(req *cmd.QueryRequest) (*cmd.QueryResponse, error)

func (*Store) QueryReadOnlyClasses

func (st *Store) QueryReadOnlyClasses(req *cmd.QueryRequest) ([]byte, error)

func (*Store) QuerySchema

func (st *Store) QuerySchema() ([]byte, error)

func (*Store) QueryShardOwner

func (st *Store) QueryShardOwner(req *cmd.QueryRequest) ([]byte, error)

func (*Store) QueryShardingState

func (st *Store) QueryShardingState(req *cmd.QueryRequest) ([]byte, error)

func (*Store) QueryTenants

func (st *Store) QueryTenants(req *cmd.QueryRequest) ([]byte, error)

func (*Store) QueryTenantsShards

func (st *Store) QueryTenantsShards(req *cmd.QueryRequest) ([]byte, error)

func (*Store) Ready

func (st *Store) Ready() bool

func (*Store) Remove

func (st *Store) Remove(id string) error

Remove removes this peer from the cluster

func (*Store) Restore

func (st *Store) Restore(rc io.ReadCloser) error

Restore is used to restore an FSM from a snapshot. It is not called concurrently with any other command. The FSM must discard all previous state before restoring the snapshot.

func (*Store) SchemaReader

func (st *Store) SchemaReader() retrySchema

func (*Store) SetDB

func (st *Store) SetDB(db Indexer)

func (*Store) Snapshot

func (st *Store) Snapshot() (raft.FSMSnapshot, error)

Snapshot returns an FSMSnapshot used to: support log compaction, to restore the FSM to a previous state, or to bring out-of-date followers up to a recent log index.

The Snapshot implementation should return quickly, because Apply can not be called while Snapshot is running. Generally this means Snapshot should only capture a pointer to the state, and any expensive IO should happen as part of FSMSnapshot.Persist.

Apply and Snapshot are always called from the same thread, but Apply will be called concurrently with FSMSnapshot.Persist. This means the FSM should be implemented to allow for concurrent updates while a snapshot is happening.

func (*Store) Stats

func (st *Store) Stats() map[string]any

Stats returns internal statistics from this store, for informational/debugging purposes only.

The statistics directly from raft are nested under the "raft" key. If the raft statistics are not yet available, then the "raft" key will not exist. See https://pkg.golang.ir/github.com/hashicorp/raft#Raft.Stats for the default raft stats.

The values of "leader_address" and "leader_id" are the respective address/ID for the current leader of the cluster. They may be empty strings if there is no current leader or the leader is unknown.

The value of "ready" indicates whether this store is ready, see Store.Ready.

The value of "is_voter" indicates whether this store is a voter, see Store.IsVoter.

The value of "open" indicates whether this store is open, see Store.open.

The value of "bootstrapped" indicates whether this store has completed bootstrapping, see Store.bootstrapped.

The value of "candidates" is a map[string]string of the current candidates IDs/addresses, see Store.candidates.

The value of "last_store_log_applied_index" is the index of the last applied command found when the store was opened, see Store.lastAppliedIndexOnStart.

The value of "last_applied_index" is the index of the latest update to the store, see Store.lastAppliedIndex.

The value of "db_loaded" indicates whether the DB has finished loading, see Store.dbLoaded.

Since this is for information/debugging we want to avoid enforcing unnecessary restrictions on what can go in these stats, thus we're returning map[string]any. However, any values added to this map should be able to be JSON encoded.

func (*Store) StoreSchemaV1

func (st *Store) StoreSchemaV1() error

StoreSchemaV1() is responsible for saving new schema (RAFT) to boltDB

func (*Store) VersionedSchemaReader

func (st *Store) VersionedSchemaReader() versionedSchema

func (*Store) WaitForAppliedIndex

func (st *Store) WaitForAppliedIndex(ctx context.Context, period time.Duration, version uint64) error

WaitForAppliedIndex waits until the update with the given version is propagated to this follower node

func (*Store) WaitToRestoreDB

func (st *Store) WaitToRestoreDB(ctx context.Context, period time.Duration, close chan struct{}) error

WaitToLoadDB waits for the DB to be loaded. The DB might be first loaded after RAFT is in a healthy state, which is when the leader has been elected and there is consensus on the log.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL