disttae

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2024 License: Apache-2.0 Imports: 73 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PREFETCH_THRESHOLD  = 256
	PREFETCH_ROUNDS     = 24
	SMALLSCAN_THRESHOLD = 100
	LARGESCAN_THRESHOLD = 1500
)
View Source
const (
	INSERT = iota
	DELETE
	COMPACTION_CN
	UPDATE
	ALTER
	INSERT_TXN // Only for CN workspace consumption, not sent to DN
	DELETE_TXN // Only for CN workspace consumption, not sent to DN

	MERGEOBJECT
)
View Source
const (
	SMALL = iota
	NORMAL
	LARGE
)
View Source
const (
	MO_DATABASE_ID_NAME_IDX       = 1
	MO_DATABASE_ID_ACCOUNT_IDX    = 2
	MO_DATABASE_LIST_ACCOUNT_IDX  = 1
	MO_TABLE_ID_NAME_IDX          = 1
	MO_TABLE_ID_DATABASE_ID_IDX   = 2
	MO_TABLE_ID_ACCOUNT_IDX       = 3
	MO_TABLE_LIST_DATABASE_ID_IDX = 1
	MO_TABLE_LIST_ACCOUNT_IDX     = 2
	MO_PRIMARY_OFF                = 2
	INIT_ROWID_OFFSET             = math.MaxUint32
)
View Source
const (
	WorkspaceThreshold uint64 = 1 * mpool.MB
	GCBatchOfFileCount int    = 1000
	GCPoolSize         int    = 5
)
View Source
const (
	AllColumns = "*"
)
View Source
const (

	// MinUpdateInterval is the minimal interval to update stats info as it
	// is necessary to update stats every time.
	MinUpdateInterval = time.Second * 30
)

Variables

View Source
var GcCycle = 10 * time.Second

Functions

func CompileFilterExpr added in v1.2.0

func CompileFilterExpr(
	expr *plan.Expr,
	proc *process.Process,
	tableDef *plan.TableDef,
	fs fileservice.FileService,
) (
	fastFilterOp FastFilterOp,
	loadOp LoadOp,
	objectFilterOp ObjectFilterOp,
	blockFilterOp BlockFilterOp,
	seekOp SeekFirstBlockOp,
	canCompile bool,
)

func CompileFilterExprs added in v1.2.0

func CompileFilterExprs(
	exprs []*plan.Expr,
	proc *process.Process,
	tableDef *plan.TableDef,
	fs fileservice.FileService,
) (
	fastFilterOp FastFilterOp,
	loadOp LoadOp,
	objectFilterOp ObjectFilterOp,
	blockFilterOp BlockFilterOp,
	seekOp SeekFirstBlockOp,
	canCompile bool,
)

func ConstructObjStatsByLoadObjMeta added in v1.1.0

func ConstructObjStatsByLoadObjMeta(
	ctx context.Context, metaLoc objectio.Location,
	fs fileservice.FileService) (stats objectio.ObjectStats, dataMeta objectio.ObjectDataMeta, err error)

func EvalSelectedOnFixedSizeColumnFactory added in v0.8.0

func EvalSelectedOnFixedSizeColumnFactory[T types.FixedSizeTExceptStrType](
	v T,
) func(*vector.Vector, []int32, *[]int32)

2.1 fixedSize type column + non-sorted

func EvalSelectedOnFixedSizeSortedColumnFactory added in v0.8.0

func EvalSelectedOnFixedSizeSortedColumnFactory[T types.FixedSizeTExceptStrType](
	v T, comp func(T, T) int,
) func(*vector.Vector, []int32, *[]int32)

1.2 fixed size column type + sorted column

func EvalSelectedOnOrderedSortedColumnFactory added in v0.8.0

func EvalSelectedOnOrderedSortedColumnFactory[T types.OrderedT](
	v T,
) func(*vector.Vector, []int32, *[]int32)

1.1 ordered column type + sorted column

func EvalSelectedOnVarlenColumnFactory added in v0.8.0

func EvalSelectedOnVarlenColumnFactory(
	v []byte,
) func(*vector.Vector, []int32, *[]int32)

2.2 varlen type column + non-sorted

func EvalSelectedOnVarlenSortedColumnFactory added in v0.8.0

func EvalSelectedOnVarlenSortedColumnFactory(
	v []byte,
) func(*vector.Vector, []int32, *[]int32)

1.3 varlen type column + sorted

func ExecuteBlockFilter added in v1.2.0

func ExecuteBlockFilter(
	snapshotTS timestamp.Timestamp,
	fastFilterOp FastFilterOp,
	loadOp LoadOp,
	objectFilterOp ObjectFilterOp,
	blockFilterOp BlockFilterOp,
	seekOp SeekFirstBlockOp,
	snapshot *logtailreplay.PartitionState,
	uncommittedObjects []objectio.ObjectStats,
	dirtyBlocks map[types.Blockid]struct{},
	outBlocks *objectio.BlockInfoSlice,
	fs fileservice.FileService,
	proc *process.Process,
) (err error)

func ForeachBlkInObjStatsList added in v1.1.0

func ForeachBlkInObjStatsList(
	next bool,
	dataMeta objectio.ObjectDataMeta,
	onBlock func(blk objectio.BlockInfo, blkMeta objectio.BlockObject) bool,
	objects ...objectio.ObjectStats,
)

ForeachBlkInObjStatsList receives an object info list, and visits each blk of these object info by OnBlock, until the onBlock returns false or all blks have been enumerated. when onBlock returns a false, the next argument decides whether continue onBlock on the next stats or exit foreach completely.

func ForeachCommittedObjects added in v1.2.0

func ForeachCommittedObjects(
	createObjs map[objectio.ObjectNameShort]struct{},
	delObjs map[objectio.ObjectNameShort]struct{},
	p *logtailreplay.PartitionState,
	onObj func(info logtailreplay.ObjectInfo) error) (err error)

func ForeachSnapshotObjects added in v1.1.0

func ForeachSnapshotObjects(
	ts timestamp.Timestamp,
	onObject func(obj logtailreplay.ObjectInfo, isCommitted bool) error,
	tableSnapshot *logtailreplay.PartitionState,
	uncommitted ...objectio.ObjectStats,
) (err error)

func ForeachVisibleDataObject added in v1.2.0

func ForeachVisibleDataObject(
	state *logtailreplay.PartitionState,
	ts types.TS,
	fn func(obj logtailreplay.ObjectEntry) error,
) (err error)

func LinearSearchOffsetByValFactory added in v1.2.0

func LinearSearchOffsetByValFactory(pk *vector.Vector) func(*vector.Vector) []int32

func ListTnService added in v1.1.0

func ListTnService(appendFn func(service *metadata.TNService))

ListTnService gets all tn service in the cluster

func MustGetFullCompositePK added in v1.2.0

func MustGetFullCompositePK(
	expr *plan.Expr,
	pkName string,
	keys []string,
	packer *types.Packer,
	proc *process.Process,
) (bool, *plan.Expr)

func MustGetFullCompositePKValue added in v1.2.0

func MustGetFullCompositePKValue(
	expr *plan.Expr,
	pkName string,
	keys []string,
	packer *types.Packer,
	proc *process.Process,
) (canEval, isVec bool, val []byte)

func NewMergeReader added in v0.8.0

func NewMergeReader(readers []engine.Reader) *mergeReader

func TryFastFilterBlocks added in v1.2.0

func TryFastFilterBlocks(
	snapshotTS timestamp.Timestamp,
	tableDef *plan.TableDef,
	exprs []*plan.Expr,
	snapshot *logtailreplay.PartitionState,
	uncommittedObjects []objectio.ObjectStats,
	dirtyBlocks map[types.Blockid]struct{},
	outBlocks *objectio.BlockInfoSlice,
	fs fileservice.FileService,
	proc *process.Process,
) (ok bool, err error)

func UnfoldBlkInfoFromObjStats added in v1.1.0

func UnfoldBlkInfoFromObjStats(stats *objectio.ObjectStats) (blks []objectio.BlockInfo)

UnfoldBlkInfoFromObjStats constructs a block info list from the given object stats. this unfolds all block info at one operation, if an object contains a great many of blocks, this operation is memory sensitive, we recommend another way, StatsBlkIter or ForEach.

func UpdateStats added in v1.0.0

func UpdateStats(ctx context.Context, req *updateStatsRequest) error

UpdateStats is the main function to calculate and update the stats for scan node.

Types

type BlockFilterOp added in v1.2.0

type BlockFilterOp func(int, objectio.BlockObject, objectio.BloomFilter) (bool, bool, error)

type DNStore

type DNStore = metadata.TNService

type Engine

type Engine struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func New

func (*Engine) AllocateIDByKey added in v0.8.0

func (e *Engine) AllocateIDByKey(ctx context.Context, key string) (uint64, error)

func (*Engine) Create

func (e *Engine) Create(ctx context.Context, name string, op client.TxnOperator) error

func (*Engine) Database

func (e *Engine) Database(ctx context.Context, name string,
	op client.TxnOperator) (engine.Database, error)

func (*Engine) DatabaseByAccountID added in v1.2.0

func (e *Engine) DatabaseByAccountID(
	accountID uint32,
	name string,
	op client.TxnOperator) (engine.Database, error)

func (*Engine) Databases

func (e *Engine) Databases(ctx context.Context, op client.TxnOperator) ([]string, error)

func (*Engine) Delete

func (e *Engine) Delete(ctx context.Context, name string, op client.TxnOperator) error

func (*Engine) GetNameById added in v0.7.0

func (e *Engine) GetNameById(ctx context.Context, op client.TxnOperator, tableId uint64) (dbName string, tblName string, err error)

func (*Engine) GetRelationById added in v0.7.0

func (e *Engine) GetRelationById(ctx context.Context, op client.TxnOperator, tableId uint64) (dbName, tableName string, rel engine.Relation, err error)

func (*Engine) Hints

func (e *Engine) Hints() (h engine.Hints)

func (*Engine) InitLogTailPushModel added in v0.8.0

func (e *Engine) InitLogTailPushModel(ctx context.Context, timestampWaiter client.TimestampWaiter) error

func (*Engine) New

func (e *Engine) New(ctx context.Context, op client.TxnOperator) error

func (*Engine) NewBlockReader

func (e *Engine) NewBlockReader(ctx context.Context, num int, ts timestamp.Timestamp,
	expr *plan.Expr, ranges []byte, tblDef *plan.TableDef, proc any) ([]engine.Reader, error)

func (*Engine) Nodes

func (e *Engine) Nodes(
	isInternal bool, tenant string, username string, cnLabel map[string]string,
) (engine.Nodes, error)

func (*Engine) PushClient added in v1.2.0

func (e *Engine) PushClient() *PushClient

func (*Engine) Stats added in v1.2.0

func (e *Engine) Stats(ctx context.Context, key pb.StatsInfoKey, sync bool) *pb.StatsInfo

func (*Engine) TryToSubscribeTable added in v1.2.0

func (e *Engine) TryToSubscribeTable(ctx context.Context, dbID, tbID uint64) error

TryToSubscribeTable implements the LogtailEngine interface.

func (*Engine) UnsubscribeTable added in v1.2.0

func (e *Engine) UnsubscribeTable(ctx context.Context, dbID, tbID uint64) error

UnsubscribeTable implements the LogtailEngine interface.

func (*Engine) UpdateOfPush added in v0.8.0

func (e *Engine) UpdateOfPush(
	ctx context.Context,
	databaseId,
	tableId uint64, ts timestamp.Timestamp) error

type Entry

type Entry struct {
	// contains filtered or unexported fields
}

Entry represents a delete/insert

type FastFilterOp added in v1.2.0

type FastFilterOp func(objectio.ObjectStats) (bool, error)

type GlobalStats added in v1.2.0

type GlobalStats struct {

	// KeyRouter is the router to decides which node should send to.
	KeyRouter client.KeyRouter[pb.StatsInfoKey]
	// contains filtered or unexported fields
}

func NewGlobalStats added in v1.2.0

func NewGlobalStats(
	ctx context.Context, e *Engine, keyRouter client.KeyRouter[pb.StatsInfoKey], opts ...GlobalStatsOption,
) *GlobalStats

func (*GlobalStats) Get added in v1.2.0

func (gs *GlobalStats) Get(ctx context.Context, key pb.StatsInfoKey, sync bool) *pb.StatsInfo

type GlobalStatsConfig added in v1.2.0

type GlobalStatsConfig struct {
	LogtailUpdateStatsThreshold int
}

type GlobalStatsOption added in v1.2.0

type GlobalStatsOption func(s *GlobalStats)

func WithLogtailUpdateStatsThreshold added in v1.2.0

func WithLogtailUpdateStatsThreshold(v int) GlobalStatsOption

type IDGenerator

type IDGenerator interface {
	AllocateID(ctx context.Context) (uint64, error)
	// AllocateIDByKey allocate a globally unique ID by key.
	AllocateIDByKey(ctx context.Context, key string) (uint64, error)
}

type LoadOpFactory added in v1.2.0

type LoadOpFactory func(fileservice.FileService) LoadOp

type ObjectFilterOp added in v1.2.0

type ObjectFilterOp func(objectio.ObjectMeta, objectio.BloomFilter) (bool, error)

type PKFilter added in v1.2.0

type PKFilter struct {
	// contains filtered or unexported fields
}

func (*PKFilter) SetFullData added in v1.2.0

func (f *PKFilter) SetFullData(op uint8, isVec bool, val []byte)

func (*PKFilter) SetNull added in v1.2.0

func (f *PKFilter) SetNull()

func (*PKFilter) SetVal added in v1.2.0

func (f *PKFilter) SetVal(op uint8, isVec bool, val any)

func (*PKFilter) String added in v1.2.0

func (f *PKFilter) String() string

type PartitionReader

type PartitionReader struct {
	// contains filtered or unexported fields
}

func (*PartitionReader) Close

func (p *PartitionReader) Close() error

func (*PartitionReader) GetOrderBy added in v1.2.0

func (p *PartitionReader) GetOrderBy() []*plan.OrderBySpec

func (*PartitionReader) Read

func (p *PartitionReader) Read(
	_ context.Context,
	colNames []string,
	_ *plan.Expr,
	mp *mpool.MPool,
	pool engine.VectorPool) (result *batch.Batch, err error)

func (*PartitionReader) SetFilterZM added in v1.2.0

func (p *PartitionReader) SetFilterZM(objectio.ZoneMap)

func (*PartitionReader) SetOrderBy added in v1.2.0

func (p *PartitionReader) SetOrderBy([]*plan.OrderBySpec)

type Pos added in v0.8.0

type Pos struct {
	// contains filtered or unexported fields
}

type PushClient added in v1.2.0

type PushClient struct {
	// contains filtered or unexported fields
}

PushClient is a structure responsible for all operations related to the log tail push model. It provides the following methods:

	-----------------------------------------------------------------------------------------------------
	 1. checkTxnTimeIsLegal : block the process until we have received enough log tail (T_log >= T_txn)
	 2. TryToSubscribeTable : block the process until we subscribed a table succeed.
	 3. subscribeTable	   : send a table subscribe request to service.
	 4. subSysTables : subscribe mo_databases, mo_tables, mo_columns
	 5. receiveTableLogTailContinuously   : start (1 + consumerNumber) routine to receive log tail from service.

 Watch out for the following points:
	 1. if we want to lock both subscriber and subscribed, we should lock subscriber first.
	-----------------------------------------------------------------------------------------------------

func (*PushClient) GetState added in v1.2.0

func (c *PushClient) GetState() State

func (*PushClient) TryToSubscribeTable added in v1.2.0

func (c *PushClient) TryToSubscribeTable(
	ctx context.Context,
	dbId, tblId uint64) error

TryToSubscribeTable subscribe a table and block until subscribe succeed.

func (*PushClient) UnsubscribeTable added in v1.2.0

func (c *PushClient) UnsubscribeTable(ctx context.Context, dbID, tbID uint64) error

UnsubscribeTable implements the LogtailEngine interface.

type SeekFirstBlockOp added in v1.2.0

type SeekFirstBlockOp func(objectio.ObjectDataMeta) int

type State added in v1.2.0

type State struct {
	LatestTS  timestamp.Timestamp
	SubTables map[SubTableID]SubTableStatus
}

type StatsBlkIter added in v1.1.0

type StatsBlkIter struct {
	// contains filtered or unexported fields
}

func NewStatsBlkIter added in v1.1.0

func NewStatsBlkIter(stats *objectio.ObjectStats, meta objectio.ObjectDataMeta) *StatsBlkIter

func (*StatsBlkIter) Entry added in v1.1.0

func (i *StatsBlkIter) Entry() objectio.BlockInfo

func (*StatsBlkIter) Next added in v1.1.0

func (i *StatsBlkIter) Next() bool

type SubTableID added in v1.2.0

type SubTableID struct {
	DatabaseID uint64
	TableID    uint64
}

type SubTableStatus added in v1.2.0

type SubTableStatus struct {
	IsDeleting bool
	LatestTime time.Time
}

type Transaction

type Transaction struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Transaction represents a transaction

func (*Transaction) Adjust added in v1.0.0

func (txn *Transaction) Adjust(writeOffset uint64) error

Adjust adjust writes order after the current statement finished.

func (*Transaction) BindTxnOp added in v1.2.0

func (txn *Transaction) BindTxnOp(op client.TxnOperator)

func (*Transaction) CloneSnapshotWS added in v1.2.0

func (txn *Transaction) CloneSnapshotWS() client.Workspace

func (*Transaction) Commit added in v0.8.0

func (txn *Transaction) Commit(ctx context.Context) ([]txn.TxnRequest, error)

func (*Transaction) EndStatement added in v1.0.0

func (txn *Transaction) EndStatement()

func (*Transaction) GetSQLCount added in v1.0.0

func (txn *Transaction) GetSQLCount() uint64

func (*Transaction) GetSnapshotWriteOffset added in v1.1.2

func (txn *Transaction) GetSnapshotWriteOffset() int

func (*Transaction) IncrSQLCount added in v1.0.0

func (txn *Transaction) IncrSQLCount()

func (*Transaction) IncrStatementID added in v0.8.0

func (txn *Transaction) IncrStatementID(ctx context.Context, commit bool) error

func (*Transaction) PutCnBlockDeletes added in v0.8.0

func (txn *Transaction) PutCnBlockDeletes(blockId *types.Blockid, offsets []int64)

func (*Transaction) ReadOnly

func (txn *Transaction) ReadOnly() bool

detecting whether a transaction is a read-only transaction

func (*Transaction) Rollback added in v0.8.0

func (txn *Transaction) Rollback(ctx context.Context) error

func (*Transaction) RollbackLastStatement added in v0.8.0

func (txn *Transaction) RollbackLastStatement(ctx context.Context) error

func (*Transaction) StartStatement added in v1.0.0

func (txn *Transaction) StartStatement()

func (*Transaction) UpdateSnapshotWriteOffset added in v1.1.2

func (txn *Transaction) UpdateSnapshotWriteOffset()

func (*Transaction) WriteBatch

func (txn *Transaction) WriteBatch(
	typ int,
	accountId uint32,
	databaseId uint64,
	tableId uint64,
	databaseName string,
	tableName string,
	bat *batch.Batch,
	tnStore DNStore,
	primaryIdx int,
	insertBatchHasRowId bool,
	truncate bool) error

WriteBatch used to write data to the transaction buffer insert/delete/update all use this api insertBatchHasRowId : it denotes the batch has Rowid when the typ is INSERT. if typ is not INSERT, it is always false. truncate : it denotes the batch with typ DELETE on mo_tables is generated when Truncating a table.

func (*Transaction) WriteFile

func (txn *Transaction) WriteFile(
	typ int,
	accountId uint32,
	databaseId,
	tableId uint64,
	databaseName,
	tableName string,
	fileName string,
	bat *batch.Batch,
	tnStore DNStore) error

WriteFile used to add a s3 file information to the transaction buffer insert/delete/update all use this api

func (*Transaction) WriteFileLocked added in v1.0.0

func (txn *Transaction) WriteFileLocked(
	typ int,
	accountId uint32,
	databaseId,
	tableId uint64,
	databaseName,
	tableName string,
	fileName string,
	bat *batch.Batch,
	tnStore DNStore) error

func (*Transaction) WriteOffset added in v1.1.1

func (txn *Transaction) WriteOffset() uint64

writeOffset returns the offset of the first write in the workspace

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL