model

package
v0.0.0-...-08aec53 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2024 License: Apache-2.0 Imports: 34 Imported by: 25

Documentation

Index

Constants

View Source
const (
	OperDispatched uint64 = iota
	OperProcessed
	OperFinished
)

All TableOperation status

View Source
const (
	// HandleIndexPKIsHandle represents that the handle index is the pk and the pk is the handle
	HandleIndexPKIsHandle = -1
	// HandleIndexTableIneligible represents that the table is ineligible
	HandleIndexTableIneligible = -2
)
View Source
const (
	// DefaultNamespace is the default namespace value,
	// all the old changefeed will be put into default namespace
	DefaultNamespace = "default"
)
View Source
const (
	// Move means after the delete operation, the table will be re added.
	// This field is necessary since we must persist enough information to
	// restore complete table operation in case of processor or owner crashes.
	OperFlagMoveTable uint64 = 1 << iota
)

All TableOperation flags

Variables

This section is empty.

Functions

func AddExtraColumnInfo

func AddExtraColumnInfo(tableInfo *model.TableInfo, extraColInfos []rowcodec.ColInfo)

AddExtraColumnInfo is used to add some extra column info to the table info. Just use it in test.

func BuildTiDBTableInfo

func BuildTiDBTableInfo(tableName string, columns []*Column, indexColumns [][]int) *model.TableInfo

BuildTiDBTableInfo is a simple wrapper over BuildTiDBTableInfoImpl which create a default ColumnIDAllocator.

func BuildTiDBTableInfoImpl

func BuildTiDBTableInfoImpl(
	tableName string,
	columns []*Column,
	indexColumns [][]int,
	columnIDAllocator ColumnIDAllocator,
) *model.TableInfo

BuildTiDBTableInfoImpl builds a TiDB TableInfo from given information. Note the result TableInfo may not be same as the original TableInfo in tidb. The only guarantee is that you can restore the `Name`, `Type`, `Charset`, `Collation` and `Flag` field of `Column` using the result TableInfo. The precondition required for calling this function:

  1. There must be at least one handle key in `columns`;
  2. The handle key must either be a primary key or a non null unique key;
  3. The index that is selected as the handle must be provided in `indexColumns`;

func BuildTiDBTableInfoWithoutVirtualColumns

func BuildTiDBTableInfoWithoutVirtualColumns(source *model.TableInfo) *model.TableInfo

BuildTiDBTableInfoWithoutVirtualColumns build a TableInfo without virual columns from the source table info

func ColumnValueString

func ColumnValueString(c interface{}) string

ColumnValueString returns the string representation of the column value

func ComparePolymorphicEvents

func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool

ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order. It returns true if and only if i should precede j.

func GetColumnDefaultValue

func GetColumnDefaultValue(col *model.ColumnInfo) interface{}

GetColumnDefaultValue returns the default definition of a column.

func GetHandleAndUniqueIndexOffsets4Test

func GetHandleAndUniqueIndexOffsets4Test(cols []*Column) [][]int

GetHandleAndUniqueIndexOffsets4Test is used to get the offsets of handle columns and other unique index columns in test

func IsColCDCVisible

func IsColCDCVisible(col *model.ColumnInfo) bool

IsColCDCVisible returns whether the col is visible for CDC

func ListVersionsFromCaptureInfos

func ListVersionsFromCaptureInfos(captureInfos []*CaptureInfo) []string

ListVersionsFromCaptureInfos returns the version list of the CaptureInfo list.

func ShouldSplitUpdateEvent

func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool

ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on whether the handle key column or unique key has been modified. If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.

func SplitUpdateEvent

func SplitUpdateEvent(
	updateEvent *RowChangedEvent,
) (*RowChangedEvent, *RowChangedEvent, error)

SplitUpdateEvent splits an update event into a delete and an insert event.

func ValidateChangefeedID

func ValidateChangefeedID(changefeedID string) error

ValidateChangefeedID returns true if the changefeed ID matches the pattern "^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$", length no more than "changeFeedIDMaxLen", eg, "simple-changefeed-task".

func ValidateNamespace

func ValidateNamespace(namespace string) error

ValidateNamespace returns true if the namespace matches the pattern "^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$", length no more than "changeFeedIDMaxLen", eg, "simple-changefeed-task".

Types

type AdminJob

type AdminJob struct {
	CfID                  ChangeFeedID
	Type                  AdminJobType
	Error                 *RunningError
	OverwriteCheckpointTs uint64
}

AdminJob holds an admin job

type AdminJobType

type AdminJobType int

AdminJobType represents for admin job type, both used in owner and processor

const (
	AdminNone AdminJobType = iota
	AdminStop
	AdminResume
	AdminRemove
	AdminFinish
)

All AdminJob types

func (AdminJobType) IsStopState

func (t AdminJobType) IsStopState() bool

IsStopState returns whether changefeed is in stop state with give admin job

func (AdminJobType) String

func (t AdminJobType) String() string

String implements fmt.Stringer interface.

type Capture

type Capture struct {
	ID            string `json:"id"`
	IsOwner       bool   `json:"is_owner"`
	AdvertiseAddr string `json:"address"`
	ClusterID     string `json:"cluster_id"`
}

Capture holds common information of a capture in cdc

type CaptureID

type CaptureID = string

CaptureID is the type for capture ID

type CaptureInfo

type CaptureInfo struct {
	ID            CaptureID `json:"id"`
	AdvertiseAddr string    `json:"address"`

	Version        string `json:"version"`
	GitHash        string `json:"git-hash"`
	DeployPath     string `json:"deploy-path"`
	StartTimestamp int64  `json:"start-timestamp"`
}

CaptureInfo store in etcd.

func (*CaptureInfo) Marshal

func (c *CaptureInfo) Marshal() ([]byte, error)

Marshal using json.Marshal.

func (*CaptureInfo) Unmarshal

func (c *CaptureInfo) Unmarshal(data []byte) error

Unmarshal from binary data.

type CaptureTaskStatus

type CaptureTaskStatus struct {
	CaptureID string `json:"capture_id"`
	// Table list, containing tables that processor should process
	Tables    []int64                     `json:"table_ids,omitempty"`
	Operation map[TableID]*TableOperation `json:"table_operations,omitempty"`
}

CaptureTaskStatus holds TaskStatus of a capture

type ChangeFeedID

type ChangeFeedID struct {
	// Namespace and ID pair is unique in one ticdc cluster
	// the default value of Namespace is "default"
	Namespace string
	ID        string
}

ChangeFeedID is the type for change feed ID

func ChangeFeedID4Test

func ChangeFeedID4Test(namespace, id string) ChangeFeedID

ChangeFeedID4Test returns `ChangefeedID` with given namespace and id

func DefaultChangeFeedID

func DefaultChangeFeedID(id string) ChangeFeedID

DefaultChangeFeedID returns `ChangeFeedID` with default namespace

func (ChangeFeedID) String

func (c ChangeFeedID) String() string

String implements fmt.Stringer interface

type ChangeFeedInfo

type ChangeFeedInfo struct {
	UpstreamID uint64    `json:"upstream-id"`
	Namespace  string    `json:"namespace"`
	ID         string    `json:"changefeed-id"`
	SinkURI    string    `json:"sink-uri"`
	CreateTime time.Time `json:"create-time"`
	// Start sync at this commit ts if `StartTs` is specify or using the CreateTime of changefeed.
	StartTs uint64 `json:"start-ts"`
	// The ChangeFeed will exits until sync to timestamp TargetTs
	TargetTs uint64 `json:"target-ts"`
	// used for admin job notification, trigger watch event in capture
	AdminJobType AdminJobType `json:"admin-job-type"`
	Engine       SortEngine   `json:"sort-engine"`
	// SortDir is deprecated
	// it cannot be set by user in changefeed level, any assignment to it should be ignored.
	// but can be fetched for backward compatibility
	SortDir string `json:"sort-dir"`

	Config  *config.ReplicaConfig `json:"config"`
	State   FeedState             `json:"state"`
	Error   *RunningError         `json:"error"`
	Warning *RunningError         `json:"warning"`

	CreatorVersion string `json:"creator-version"`
	// Epoch is the epoch of a changefeed, changes on every restart.
	Epoch uint64 `json:"epoch"`
}

ChangeFeedInfo describes the detail of a ChangeFeed

func (*ChangeFeedInfo) Clone

func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error)

Clone returns a cloned ChangeFeedInfo

func (*ChangeFeedInfo) FixIncompatible

func (info *ChangeFeedInfo) FixIncompatible()

FixIncompatible fixes incompatible changefeed meta info.

func (*ChangeFeedInfo) GetCheckpointTs

func (info *ChangeFeedInfo) GetCheckpointTs(status *ChangeFeedStatus) uint64

GetCheckpointTs returns CheckpointTs if it's specified in ChangeFeedStatus, otherwise StartTs is returned.

func (*ChangeFeedInfo) GetStartTs

func (info *ChangeFeedInfo) GetStartTs() uint64

GetStartTs returns StartTs if it's specified or using the CreateTime of changefeed.

func (*ChangeFeedInfo) GetTargetTs

func (info *ChangeFeedInfo) GetTargetTs() uint64

GetTargetTs returns TargetTs if it's specified, otherwise MaxUint64 is returned.

func (*ChangeFeedInfo) Marshal

func (info *ChangeFeedInfo) Marshal() (string, error)

Marshal returns the json marshal format of a ChangeFeedInfo

func (*ChangeFeedInfo) NeedBlockGC

func (info *ChangeFeedInfo) NeedBlockGC() bool

NeedBlockGC returns true if the changefeed need to block the GC safepoint. Note: if the changefeed is failed by GC, it should not block the GC safepoint.

func (*ChangeFeedInfo) RmUnusedFields

func (info *ChangeFeedInfo) RmUnusedFields()

RmUnusedFields removes unnecessary fields based on the downstream type and the protocol. Since we utilize a common changefeed configuration template, certain fields may not be utilized for certain protocols.

func (*ChangeFeedInfo) String

func (info *ChangeFeedInfo) String() (str string)

String implements fmt.Stringer interface, but hide some sensitive information

func (*ChangeFeedInfo) Unmarshal

func (info *ChangeFeedInfo) Unmarshal(data []byte) error

Unmarshal unmarshals into *ChangeFeedInfo from json marshal byte slice

func (*ChangeFeedInfo) VerifyAndComplete

func (info *ChangeFeedInfo) VerifyAndComplete()

VerifyAndComplete verifies changefeed info and may fill in some fields. If a required field is not provided, return an error. If some necessary filed is missing but can use a default value, fill in it.

type ChangeFeedStatus

type ChangeFeedStatus struct {
	CheckpointTs uint64 `json:"checkpoint-ts"`
	// minTableBarrierTs is the minimum commitTs of all DDL events and is only
	// used to check whether there is a pending DDL job at the checkpointTs when
	// initializing the changefeed.
	MinTableBarrierTs uint64 `json:"min-table-barrier-ts"`
	// TODO: remove this filed after we don't use ChangeFeedStatus to
	// control processor. This is too ambiguous.
	AdminJobType AdminJobType `json:"admin-job-type"`
}

ChangeFeedStatus stores information about a ChangeFeed It is stored in etcd.

func (*ChangeFeedStatus) Marshal

func (status *ChangeFeedStatus) Marshal() (string, error)

Marshal returns json encoded string of ChangeFeedStatus, only contains necessary fields stored in storage

func (*ChangeFeedStatus) Unmarshal

func (status *ChangeFeedStatus) Unmarshal(data []byte) error

Unmarshal unmarshals into *ChangeFeedStatus from json marshal byte slice

type ChangeFeedStatusForAPI

type ChangeFeedStatusForAPI struct {
	ResolvedTs   uint64 `json:"resolved-ts"`
	CheckpointTs uint64 `json:"checkpoint-ts"`
}

ChangeFeedStatusForAPI uses to transfer the status of changefeed for API.

type ChangeFeedSyncedStatusForAPI

type ChangeFeedSyncedStatusForAPI struct {
	CheckpointTs        uint64 `json:"checkpoint-ts"`
	LastSyncedTs        uint64 `json:"last-sync-time"`
	PullerResolvedTs    uint64 `json:"puller-resolved-ts"`
	SyncedCheckInterval int64  `json:"synced-check-interval"`
	CheckpointInterval  int64  `json:"checkpoint-interval"`
}

ChangeFeedSyncedStatusForAPI uses to transfer the synced status of changefeed for API.

type ChangefeedCommonInfo

type ChangefeedCommonInfo struct {
	UpstreamID     uint64        `json:"upstream_id"`
	Namespace      string        `json:"namespace"`
	ID             string        `json:"id"`
	FeedState      FeedState     `json:"state"`
	CheckpointTSO  uint64        `json:"checkpoint_tso"`
	CheckpointTime JSONTime      `json:"checkpoint_time"`
	RunningError   *RunningError `json:"error"`
}

ChangefeedCommonInfo holds some common usage information of a changefeed

func (ChangefeedCommonInfo) MarshalJSON

func (c ChangefeedCommonInfo) MarshalJSON() ([]byte, error)

MarshalJSON use to marshal ChangefeedCommonInfo

type ChangefeedConfig

type ChangefeedConfig struct {
	Namespace string `json:"namespace"`
	ID        string `json:"changefeed_id"`
	StartTS   uint64 `json:"start_ts"`
	TargetTS  uint64 `json:"target_ts"`
	SinkURI   string `json:"sink_uri"`
	// timezone used when checking sink uri
	TimeZone string `json:"timezone" default:"system"`
	// if true, force to replicate some ineligible tables
	ForceReplicate        bool               `json:"force_replicate" default:"false"`
	IgnoreIneligibleTable bool               `json:"ignore_ineligible_table" default:"false"`
	FilterRules           []string           `json:"filter_rules"`
	IgnoreTxnStartTs      []uint64           `json:"ignore_txn_start_ts"`
	MounterWorkerNum      int                `json:"mounter_worker_num" default:"16"`
	SinkConfig            *config.SinkConfig `json:"sink_config"`
}

ChangefeedConfig use to create a changefeed

type ChangefeedDetail

type ChangefeedDetail struct {
	UpstreamID     uint64              `json:"upstream_id"`
	Namespace      string              `json:"namespace"`
	ID             string              `json:"id"`
	SinkURI        string              `json:"sink_uri"`
	CreateTime     JSONTime            `json:"create_time"`
	StartTs        uint64              `json:"start_ts"`
	ResolvedTs     uint64              `json:"resolved_ts"`
	TargetTs       uint64              `json:"target_ts"`
	CheckpointTSO  uint64              `json:"checkpoint_tso"`
	CheckpointTime JSONTime            `json:"checkpoint_time"`
	Engine         SortEngine          `json:"sort_engine,omitempty"`
	FeedState      FeedState           `json:"state"`
	RunningError   *RunningError       `json:"error"`
	ErrorHis       []int64             `json:"error_history"`
	CreatorVersion string              `json:"creator_version"`
	TaskStatus     []CaptureTaskStatus `json:"task_status,omitempty"`
}

ChangefeedDetail holds detail info of a changefeed

func (ChangefeedDetail) MarshalJSON

func (c ChangefeedDetail) MarshalJSON() ([]byte, error)

MarshalJSON use to marshal ChangefeedDetail

type Column

type Column struct {
	Name      string         `msg:"name"`
	Type      byte           `msg:"type"`
	Charset   string         `msg:"charset"`
	Collation string         `msg:"collation"`
	Flag      ColumnFlagType `msg:"-"`
	Value     interface{}    `msg:"-"`
	Default   interface{}    `msg:"-"`

	// ApproximateBytes is approximate bytes consumed by the column.
	ApproximateBytes int `msg:"-"`
}

Column represents a column value and its schema info

func (*Column) DecodeMsg

func (z *Column) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*Column) EncodeMsg

func (z *Column) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*Column) MarshalMsg

func (z *Column) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Column) Msgsize

func (z *Column) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Column) UnmarshalMsg

func (z *Column) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type ColumnData

type ColumnData struct {
	// ColumnID may be just a mock id, because we don't store it in redo log.
	// So after restore from redo log, we need to give every a column a mock id.
	// The only guarantee is that the column id is unique in a RowChangedEvent
	ColumnID int64       `json:"column_id" msg:"column_id"`
	Value    interface{} `json:"value" msg:"-"`

	// ApproximateBytes is approximate bytes consumed by the column.
	ApproximateBytes int `json:"-" msg:"-"`
}

ColumnData represents a column value in row changed event

func Columns2ColumnDatas

func Columns2ColumnDatas(cols []*Column, tableInfo *TableInfo) []*ColumnData

Columns2ColumnDatas convert `Column`s to `ColumnData`s

func (*ColumnData) DecodeMsg

func (z *ColumnData) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (ColumnData) EncodeMsg

func (z ColumnData) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (ColumnData) MarshalMsg

func (z ColumnData) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (ColumnData) Msgsize

func (z ColumnData) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*ColumnData) UnmarshalMsg

func (z *ColumnData) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type ColumnFlagType

type ColumnFlagType util.Flag

ColumnFlagType is for encapsulating the flag operations for different flags.

const (
	// BinaryFlag means the column charset is binary
	BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota)
	// HandleKeyFlag means the column is selected as the handle key
	// The handleKey is chosen by the following rules in the order:
	// 1. if the table has primary key, it's the handle key.
	// 2. If the table has not null unique key, it's the handle key.
	// 3. If the table has no primary key and no not null unique key, it has no handleKey.
	HandleKeyFlag
	// GeneratedColumnFlag means the column is a generated column
	GeneratedColumnFlag
	// PrimaryKeyFlag means the column is primary key
	PrimaryKeyFlag
	// UniqueKeyFlag means the column is unique key
	UniqueKeyFlag
	// MultipleKeyFlag means the column is multiple key
	MultipleKeyFlag
	// NullableFlag means the column is nullable
	NullableFlag
	// UnsignedFlag means the column stores an unsigned integer
	UnsignedFlag
)

func (*ColumnFlagType) IsBinary

func (b *ColumnFlagType) IsBinary() bool

IsBinary shows whether BinaryFlag is set

func (*ColumnFlagType) IsGeneratedColumn

func (b *ColumnFlagType) IsGeneratedColumn() bool

IsGeneratedColumn shows whether GeneratedColumn is set

func (*ColumnFlagType) IsHandleKey

func (b *ColumnFlagType) IsHandleKey() bool

IsHandleKey shows whether HandleKey is set

func (*ColumnFlagType) IsMultipleKey

func (b *ColumnFlagType) IsMultipleKey() bool

IsMultipleKey shows whether MultipleKeyFlag is set

func (*ColumnFlagType) IsNullable

func (b *ColumnFlagType) IsNullable() bool

IsNullable shows whether NullableFlag is set

func (*ColumnFlagType) IsPrimaryKey

func (b *ColumnFlagType) IsPrimaryKey() bool

IsPrimaryKey shows whether PrimaryKeyFlag is set

func (*ColumnFlagType) IsUniqueKey

func (b *ColumnFlagType) IsUniqueKey() bool

IsUniqueKey shows whether UniqueKeyFlag is set

func (*ColumnFlagType) IsUnsigned

func (b *ColumnFlagType) IsUnsigned() bool

IsUnsigned shows whether UnsignedFlag is set

func (*ColumnFlagType) SetIsBinary

func (b *ColumnFlagType) SetIsBinary()

SetIsBinary sets BinaryFlag

func (*ColumnFlagType) SetIsGeneratedColumn

func (b *ColumnFlagType) SetIsGeneratedColumn()

SetIsGeneratedColumn sets GeneratedColumn

func (*ColumnFlagType) SetIsHandleKey

func (b *ColumnFlagType) SetIsHandleKey()

SetIsHandleKey sets HandleKey

func (*ColumnFlagType) SetIsMultipleKey

func (b *ColumnFlagType) SetIsMultipleKey()

SetIsMultipleKey sets MultipleKeyFlag

func (*ColumnFlagType) SetIsNullable

func (b *ColumnFlagType) SetIsNullable()

SetIsNullable sets NullableFlag

func (*ColumnFlagType) SetIsPrimaryKey

func (b *ColumnFlagType) SetIsPrimaryKey()

SetIsPrimaryKey sets PrimaryKeyFlag

func (*ColumnFlagType) SetIsUniqueKey

func (b *ColumnFlagType) SetIsUniqueKey()

SetIsUniqueKey sets UniqueKeyFlag

func (*ColumnFlagType) SetIsUnsigned

func (b *ColumnFlagType) SetIsUnsigned()

SetIsUnsigned sets UnsignedFlag

func (*ColumnFlagType) UnsetIsBinary

func (b *ColumnFlagType) UnsetIsBinary()

UnsetIsBinary unsets BinaryFlag

func (*ColumnFlagType) UnsetIsGeneratedColumn

func (b *ColumnFlagType) UnsetIsGeneratedColumn()

UnsetIsGeneratedColumn unsets GeneratedColumn

func (*ColumnFlagType) UnsetIsHandleKey

func (b *ColumnFlagType) UnsetIsHandleKey()

UnsetIsHandleKey unsets HandleKey

func (*ColumnFlagType) UnsetIsMultipleKey

func (b *ColumnFlagType) UnsetIsMultipleKey()

UnsetIsMultipleKey unsets MultipleKeyFlag

func (*ColumnFlagType) UnsetIsNullable

func (b *ColumnFlagType) UnsetIsNullable()

UnsetIsNullable unsets NullableFlag

func (*ColumnFlagType) UnsetIsPrimaryKey

func (b *ColumnFlagType) UnsetIsPrimaryKey()

UnsetIsPrimaryKey unsets PrimaryKeyFlag

func (*ColumnFlagType) UnsetIsUniqueKey

func (b *ColumnFlagType) UnsetIsUniqueKey()

UnsetIsUniqueKey unsets UniqueKeyFlag

func (*ColumnFlagType) UnsetIsUnsigned

func (b *ColumnFlagType) UnsetIsUnsigned()

UnsetIsUnsigned unsets UnsignedFlag

type ColumnIDAllocator

type ColumnIDAllocator interface {
	// GetColumnID return the column id according to the column name
	GetColumnID(name string) int64
}

ColumnIDAllocator represents the interface to allocate column id for tableInfo

type DDLEvent

type DDLEvent struct {
	StartTs      uint64           `msg:"start-ts"`
	CommitTs     uint64           `msg:"commit-ts"`
	Query        string           `msg:"query"`
	TableInfo    *TableInfo       `msg:"-"`
	PreTableInfo *TableInfo       `msg:"-"`
	Type         model.ActionType `msg:"-"`
	Done         atomic.Bool      `msg:"-"`
	Charset      string           `msg:"-"`
	Collate      string           `msg:"-"`
	IsBootstrap  bool             `msg:"-"`
	// BDRRole is the role of the TiDB cluster, it is used to determine whether
	// the DDL is executed by the primary cluster.
	BDRRole string        `msg:"-"`
	SQLMode mysql.SQLMode `msg:"-"`
}

DDLEvent stores DDL event

func NewBootstrapDDLEvent

func NewBootstrapDDLEvent(tableInfo *TableInfo) *DDLEvent

NewBootstrapDDLEvent returns a bootstrap DDL event. We set Bootstrap DDL event's startTs and commitTs to 0. Because it is generated by the TiCDC, not from the upstream TiDB. And they ere useless for a bootstrap DDL event.

func (*DDLEvent) DecodeMsg

func (z *DDLEvent) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (DDLEvent) EncodeMsg

func (z DDLEvent) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*DDLEvent) FromJob

func (d *DDLEvent) FromJob(job *model.Job, preTableInfo *TableInfo, tableInfo *TableInfo)

FromJob fills the values with DDLEvent from DDL job

func (*DDLEvent) FromJobWithArgs

func (d *DDLEvent) FromJobWithArgs(
	job *model.Job,
	preTableInfo, tableInfo *TableInfo,
	oldSchemaName, newSchemaName string,
)

FromJobWithArgs fills the values with DDLEvent from DDL job

func (DDLEvent) MarshalMsg

func (z DDLEvent) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (DDLEvent) Msgsize

func (z DDLEvent) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*DDLEvent) ToRedoLog

func (d *DDLEvent) ToRedoLog() *RedoLog

ToRedoLog converts ddl event to redo log

func (*DDLEvent) UnmarshalMsg

func (z *DDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type DDLJobEntry

type DDLJobEntry struct {
	Job    *timodel.Job
	OpType OpType
	CRTs   uint64
}

DDLJobEntry is the DDL job entry.

type DrainCaptureRequest

type DrainCaptureRequest struct {
	CaptureID string `json:"capture_id"`
}

DrainCaptureRequest is request for manual `DrainCapture`

type DrainCaptureResp

type DrainCaptureResp struct {
	CurrentTableCount int `json:"current_table_count"`
}

DrainCaptureResp is response for manual `DrainCapture`

type FeedState

type FeedState string

FeedState represents the running state of a changefeed

const (
	StateNormal   FeedState = "normal"
	StatePending  FeedState = "pending"
	StateFailed   FeedState = "failed"
	StateStopped  FeedState = "stopped"
	StateRemoved  FeedState = "removed"
	StateFinished FeedState = "finished"
	StateWarning  FeedState = "warning"
	// StateUnInitialized is used for the changefeed that has not been initialized
	// it only exists in memory for a short time and will not be persisted to storage
	StateUnInitialized FeedState = ""
)

All FeedStates Only `StateNormal` and `StatePending` changefeed is running, others are stopped.

func (FeedState) IsNeeded

func (s FeedState) IsNeeded(need string) bool

IsNeeded return true if the given feedState matches the listState.

func (FeedState) IsRunning

func (s FeedState) IsRunning() bool

IsRunning return true if the feedState represents a running state.

func (FeedState) ToInt

func (s FeedState) ToInt() int

ToInt return an int for each `FeedState`, only use this for metrics.

type HTTPError

type HTTPError struct {
	Error string `json:"error_msg"`
	Code  string `json:"error_code"`
}

HTTPError of cdc http api

func NewHTTPError

func NewHTTPError(err error) HTTPError

NewHTTPError wrap a err into HTTPError

type IncrementalColumnIDAllocator

type IncrementalColumnIDAllocator struct {
	// contains filtered or unexported fields
}

IncrementalColumnIDAllocator allocates column id in an incremental way. At most of the time, it is the default implementation when you don't care the column id's concrete value.

func NewIncrementalColumnIDAllocator

func NewIncrementalColumnIDAllocator() *IncrementalColumnIDAllocator

NewIncrementalColumnIDAllocator creates a new IncrementalColumnIDAllocator

func (*IncrementalColumnIDAllocator) GetColumnID

func (d *IncrementalColumnIDAllocator) GetColumnID(name string) int64

GetColumnID return the next mock column id

type JSONTime

type JSONTime time.Time

JSONTime used to wrap time into json format

func (JSONTime) MarshalJSON

func (t JSONTime) MarshalJSON() ([]byte, error)

MarshalJSON used to specify the time format

func (*JSONTime) UnmarshalJSON

func (t *JSONTime) UnmarshalJSON(data []byte) error

UnmarshalJSON is used to parse time.Time from bytes. The builtin json.Unmarshal function cannot unmarshal a date string formatted as "2006-01-02 15:04:05.000", so we must implement a customized unmarshal function.

type Liveness

type Liveness int32

Liveness is the liveness status of a capture. Liveness can only be changed from alive to stopping, and no way back.

const (
	// LivenessCaptureAlive means the capture is alive, and ready to serve.
	LivenessCaptureAlive Liveness = 0
	// LivenessCaptureStopping means the capture is in the process of graceful shutdown.
	LivenessCaptureStopping Liveness = 1
)

func (*Liveness) Load

func (l *Liveness) Load() Liveness

Load the liveness.

func (*Liveness) Store

func (l *Liveness) Store(v Liveness) bool

Store the given liveness. Returns true if it success.

func (*Liveness) String

func (l *Liveness) String() string

type MessageType

type MessageType int

MessageType is the type of message, which is used by MqSink and RedoLog.

const (
	// MessageTypeUnknown is unknown type of message key
	MessageTypeUnknown MessageType = iota
	// MessageTypeRow is row type of message key
	MessageTypeRow
	// MessageTypeDDL is ddl type of message key
	MessageTypeDDL
	// MessageTypeResolved is resolved type of message key
	MessageTypeResolved
)

func (*MessageType) DecodeMsg

func (z *MessageType) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (MessageType) EncodeMsg

func (z MessageType) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (MessageType) MarshalMsg

func (z MessageType) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (MessageType) Msgsize

func (z MessageType) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*MessageType) UnmarshalMsg

func (z *MessageType) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type MoveTableReq

type MoveTableReq struct {
	CaptureID string `json:"capture_id"`
	TableID   int64  `json:"table_id"`
}

MoveTableReq is the request for `MoveTable`

type NameBasedColumnIDAllocator

type NameBasedColumnIDAllocator struct {
	// contains filtered or unexported fields
}

NameBasedColumnIDAllocator allocates column id using an prefined map from column name to id

func NewNameBasedColumnIDAllocator

func NewNameBasedColumnIDAllocator(nameToIDMap map[string]int64) *NameBasedColumnIDAllocator

NewNameBasedColumnIDAllocator creates a new NameBasedColumnIDAllocator

func (*NameBasedColumnIDAllocator) GetColumnID

func (n *NameBasedColumnIDAllocator) GetColumnID(name string) int64

GetColumnID return the column id of the name

type OpType

type OpType int

OpType for the kv, delete or put

const (
	OpTypeUnknown OpType = iota
	OpTypePut
	OpTypeDelete
	OpTypeResolved
)

OpType for kv

func (*OpType) DecodeMsg

func (z *OpType) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (OpType) EncodeMsg

func (z OpType) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (OpType) MarshalMsg

func (z OpType) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (OpType) Msgsize

func (z OpType) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*OpType) UnmarshalMsg

func (z *OpType) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type PolymorphicEvent

type PolymorphicEvent struct {
	StartTs  uint64
	CRTs     uint64
	Resolved *ResolvedTs

	RawKV *RawKVEntry
	Row   *RowChangedEvent
	// contains filtered or unexported fields
}

PolymorphicEvent describes an event can be in multiple states.

func NewEmptyPolymorphicEvent

func NewEmptyPolymorphicEvent(ts uint64) *PolymorphicEvent

NewEmptyPolymorphicEvent creates a new empty PolymorphicEvent.

func NewPolymorphicEvent

func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent

NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV.

func NewResolvedPolymorphicEvent

func NewResolvedPolymorphicEvent(regionID uint64, resolvedTs uint64) *PolymorphicEvent

NewResolvedPolymorphicEvent creates a new PolymorphicEvent with the resolved ts.

func (*PolymorphicEvent) IsResolved

func (e *PolymorphicEvent) IsResolved() bool

IsResolved returns true if the event is resolved. Note that this function can only be called when `RawKV != nil`.

func (*PolymorphicEvent) MarkFinished

func (e *PolymorphicEvent) MarkFinished()

MarkFinished is called to indicate that mount is finished.

func (*PolymorphicEvent) RegionID

func (e *PolymorphicEvent) RegionID() uint64

RegionID returns the region ID where the event comes from.

func (*PolymorphicEvent) SetUpFinishedCh

func (e *PolymorphicEvent) SetUpFinishedCh()

SetUpFinishedCh set up the finished chan, should be called before mounting the event.

func (*PolymorphicEvent) WaitFinished

func (e *PolymorphicEvent) WaitFinished(ctx context.Context) error

WaitFinished is called by caller to wait for the mount finished.

type ProcInfoSnap

type ProcInfoSnap struct {
	CfID      ChangeFeedID `json:"changefeed-id"`
	CaptureID string       `json:"capture-id"`
}

ProcInfoSnap holds most important replication information of a processor

type ProcessorCommonInfo

type ProcessorCommonInfo struct {
	Namespace string `json:"namespace"`
	CfID      string `json:"changefeed_id"`
	CaptureID string `json:"capture_id"`
}

ProcessorCommonInfo holds the common info of a processor

type ProcessorDetail

type ProcessorDetail struct {
	// All table ids that this processor are replicating.
	Tables []int64 `json:"table_ids"`
}

ProcessorDetail holds the detail info of a processor

type ProcessorsInfos

type ProcessorsInfos map[CaptureID]*TaskStatus

ProcessorsInfos maps from capture IDs to TaskStatus

func (ProcessorsInfos) String

func (p ProcessorsInfos) String() string

String implements fmt.Stringer interface.

type RawKVEntry

type RawKVEntry struct {
	OpType OpType `msg:"op_type"`
	Key    []byte `msg:"key"`
	// nil for delete type
	Value []byte `msg:"value"`
	// nil for insert type
	OldValue []byte `msg:"old_value"`
	StartTs  uint64 `msg:"start_ts"`
	// Commit or resolved TS
	CRTs uint64 `msg:"crts"`

	// Additional debug info
	RegionID uint64 `msg:"region_id"`
}

RawKVEntry notify the KV operator

func (*RawKVEntry) ApproximateDataSize

func (v *RawKVEntry) ApproximateDataSize() int64

ApproximateDataSize calculate the approximate size of protobuf binary representation of this event.

func (*RawKVEntry) DecodeMsg

func (z *RawKVEntry) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*RawKVEntry) EncodeMsg

func (z *RawKVEntry) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*RawKVEntry) IsUpdate

func (v *RawKVEntry) IsUpdate() bool

IsUpdate checks if the event is an update event.

func (*RawKVEntry) MarshalMsg

func (z *RawKVEntry) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*RawKVEntry) Msgsize

func (z *RawKVEntry) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*RawKVEntry) String

func (v *RawKVEntry) String() string

func (*RawKVEntry) UnmarshalMsg

func (z *RawKVEntry) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type RedoColumn

type RedoColumn struct {
	// Fields from Column and can't be marshaled directly in Column.
	Value interface{} `msg:"column"`
	// msgp transforms empty byte slice into nil, PTAL msgp#247.
	ValueIsEmptyBytes bool   `msg:"value-is-empty-bytes"`
	Flag              uint64 `msg:"flag"`
}

RedoColumn stores Column change

func (*RedoColumn) DecodeMsg

func (z *RedoColumn) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (RedoColumn) EncodeMsg

func (z RedoColumn) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (RedoColumn) MarshalMsg

func (z RedoColumn) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (RedoColumn) Msgsize

func (z RedoColumn) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*RedoColumn) UnmarshalMsg

func (z *RedoColumn) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type RedoDDLEvent

type RedoDDLEvent struct {
	DDL       *DDLEvent `msg:"ddl"`
	Type      byte      `msg:"type"`
	TableName TableName `msg:"table-name"`
}

RedoDDLEvent represents DDL event used in redo log persistent

func (*RedoDDLEvent) DecodeMsg

func (z *RedoDDLEvent) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*RedoDDLEvent) EncodeMsg

func (z *RedoDDLEvent) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*RedoDDLEvent) MarshalMsg

func (z *RedoDDLEvent) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*RedoDDLEvent) Msgsize

func (z *RedoDDLEvent) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*RedoDDLEvent) UnmarshalMsg

func (z *RedoDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type RedoLog

type RedoLog struct {
	RedoRow RedoRowChangedEvent `msg:"row"`
	RedoDDL RedoDDLEvent        `msg:"ddl"`
	Type    RedoLogType         `msg:"type"`
}

RedoLog defines the persistent structure of redo log since MsgPack do not support types that are defined in another package, more info https://github.com/tinylib/msgp/issues/158, https://github.com/tinylib/msgp/issues/149 so define a RedoColumn, RedoDDLEvent instead of using the Column, DDLEvent

func (*RedoLog) DecodeMsg

func (z *RedoLog) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*RedoLog) EncodeMsg

func (z *RedoLog) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*RedoLog) GetCommitTs

func (r *RedoLog) GetCommitTs() Ts

GetCommitTs returns the commit ts of the redo log.

func (*RedoLog) MarshalMsg

func (z *RedoLog) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*RedoLog) Msgsize

func (z *RedoLog) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*RedoLog) TrySplitAndSortUpdateEvent

func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string) error

TrySplitAndSortUpdateEvent redo log do nothing

func (*RedoLog) UnmarshalMsg

func (z *RedoLog) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type RedoLogType

type RedoLogType int

RedoLogType is the type of log

const (
	// RedoLogTypeUnknown is unknown type of log
	RedoLogTypeUnknown RedoLogType = iota
	// RedoLogTypeRow is row type of log
	RedoLogTypeRow
	// RedoLogTypeDDL is ddl type of log
	RedoLogTypeDDL
)

func (*RedoLogType) DecodeMsg

func (z *RedoLogType) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (RedoLogType) EncodeMsg

func (z RedoLogType) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (RedoLogType) MarshalMsg

func (z RedoLogType) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (RedoLogType) Msgsize

func (z RedoLogType) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*RedoLogType) UnmarshalMsg

func (z *RedoLogType) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type RedoRowChangedEvent

type RedoRowChangedEvent struct {
	Row        *RowChangedEventInRedoLog `msg:"row"`
	Columns    []RedoColumn              `msg:"columns"`
	PreColumns []RedoColumn              `msg:"pre-columns"`
}

RedoRowChangedEvent represents the DML event used in RedoLog

func (*RedoRowChangedEvent) DecodeMsg

func (z *RedoRowChangedEvent) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*RedoRowChangedEvent) EncodeMsg

func (z *RedoRowChangedEvent) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*RedoRowChangedEvent) MarshalMsg

func (z *RedoRowChangedEvent) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*RedoRowChangedEvent) Msgsize

func (z *RedoRowChangedEvent) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*RedoRowChangedEvent) UnmarshalMsg

func (z *RedoRowChangedEvent) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type RegionComparableSpan

type RegionComparableSpan struct {
	Span   tablepb.Span
	Region uint64
}

RegionComparableSpan contains a comparable span and a region id of that span

type RegionFeedEvent

type RegionFeedEvent struct {
	Val      *RawKVEntry
	Resolved *ResolvedSpans

	// Additional debug info, not used
	RegionID uint64
}

RegionFeedEvent from the kv layer. Only one of the event will be set.

func (*RegionFeedEvent) GetValue

func (e *RegionFeedEvent) GetValue() interface{}

GetValue returns the underlying value

type ResolvedMode

type ResolvedMode int

ResolvedMode describes the batch type of a resolved event.

const (
	// NormalResolvedMode means that all events whose commitTs is less than or equal to
	// `resolved.Ts` are sent to Sink.
	NormalResolvedMode ResolvedMode = iota
	// BatchResolvedMode means that all events whose commitTs is less than
	// 'resolved.Ts' are sent to Sink.
	BatchResolvedMode
)

type ResolvedSpans

type ResolvedSpans struct {
	Spans      []RegionComparableSpan
	ResolvedTs uint64
}

ResolvedSpans guarantees all the KV value event with commit ts less than ResolvedTs has been emitted.

func (*ResolvedSpans) String

func (rs *ResolvedSpans) String() string

String implements fmt.Stringer interface.

type ResolvedTs

type ResolvedTs struct {
	Mode    ResolvedMode
	Ts      uint64
	BatchID uint64
}

ResolvedTs is the resolved timestamp of sink module.

func NewResolvedTs

func NewResolvedTs(t uint64) ResolvedTs

NewResolvedTs creates a normal ResolvedTs.

func (ResolvedTs) AdvanceBatch

func (r ResolvedTs) AdvanceBatch() ResolvedTs

AdvanceBatch advances the batch id of the resolved ts.

func (ResolvedTs) Equal

func (r ResolvedTs) Equal(r1 ResolvedTs) bool

Equal judge whether the resolved ts is equal to the given ts.

func (ResolvedTs) EqualOrGreater

func (r ResolvedTs) EqualOrGreater(r1 ResolvedTs) bool

EqualOrGreater judge whether the resolved ts is equal or greater than the given ts.

func (ResolvedTs) Greater

func (r ResolvedTs) Greater(r1 ResolvedTs) bool

Greater judge whether the resolved ts is greater than the given ts.

func (ResolvedTs) IsBatchMode

func (r ResolvedTs) IsBatchMode() bool

IsBatchMode returns true if the resolved ts is BatchResolvedMode.

func (ResolvedTs) Less

func (r ResolvedTs) Less(r1 ResolvedTs) bool

Less judge whether the resolved ts is less than the given ts.

func (ResolvedTs) ResolvedMark

func (r ResolvedTs) ResolvedMark() uint64

ResolvedMark returns a timestamp `ts` based on the r.mode, which marks that all events whose commitTs is less than or equal to `ts` are sent to Sink.

type RowChangedDatums

type RowChangedDatums struct {
	RowDatums    []types.Datum
	PreRowDatums []types.Datum
}

RowChangedDatums is used to store the changed datums of a row.

func (RowChangedDatums) IsEmpty

func (r RowChangedDatums) IsEmpty() bool

IsEmpty returns true if the RowChangeDatums is empty.

type RowChangedEvent

type RowChangedEvent struct {
	StartTs  uint64
	CommitTs uint64

	RowID int64 // Deprecated. It is empty when the RowID comes from clustered index table.

	PhysicalTableID int64

	// NOTICE: We probably store the logical ID inside TableInfo's TableName,
	// not the physical ID.
	// For normal table, there is only one ID, which is the physical ID.
	// AKA TIDB_TABLE_ID.
	// For partitioned table, there are two kinds of ID:
	// 1. TIDB_PARTITION_ID is the physical ID of the partition.
	// 2. TIDB_TABLE_ID is the logical ID of the table.
	// In general, we always use the physical ID to represent a table, but we
	// record the logical ID from the DDL event(job.BinlogInfo.TableInfo).
	// So be careful when using the TableInfo.
	TableInfo *TableInfo

	Columns    []*ColumnData
	PreColumns []*ColumnData

	// Checksum for the event, only not nil if the upstream TiDB enable the row level checksum
	// and TiCDC set the integrity check level to the correctness.
	Checksum *integrity.Checksum

	// ApproximateDataSize is the approximate size of protobuf binary
	// representation of this event.
	ApproximateDataSize int64

	// SplitTxn marks this RowChangedEvent as the first line of a new txn.
	SplitTxn bool
	// ReplicatingTs is ts when a table starts replicating events to downstream.
	ReplicatingTs Ts
	// HandleKey is the key of the row changed event.
	// It can be used to identify the row changed event.
	// It can be one of three : common_handle, int_handle or _tidb_rowid based on the table definitions
	// 1. primary key is the clustered index, and key is not int type, then we use `CommonHandle`
	// 2. primary key is int type(including different types of int, such as bigint, TINYINT), then we use IntHandle
	// 3. when the table doesn't have the primary key and clustered index,
	//    tidb will make a hidden column called "_tidb_rowid" as the handle.
	//    due to the type of "_tidb_rowid" is int, so we also use IntHandle to represent.
	HandleKey kv.Handle
}

RowChangedEvent represents a row changed event

func (*RowChangedEvent) ApproximateBytes

func (r *RowChangedEvent) ApproximateBytes() int

ApproximateBytes returns approximate bytes in memory consumed by the event.

func (*RowChangedEvent) GetColumns

func (r *RowChangedEvent) GetColumns() []*Column

GetColumns returns the columns of the event

func (*RowChangedEvent) GetCommitTs

func (r *RowChangedEvent) GetCommitTs() uint64

GetCommitTs returns the commit timestamp of this event.

func (*RowChangedEvent) GetHandleKeyColumnValues

func (r *RowChangedEvent) GetHandleKeyColumnValues() []string

GetHandleKeyColumnValues returns all handle key's column values

func (*RowChangedEvent) GetPreColumns

func (r *RowChangedEvent) GetPreColumns() []*Column

GetPreColumns returns the pre columns of the event

func (*RowChangedEvent) HandleKeyColInfos

func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo)

HandleKeyColInfos returns the column(s) and colInfo(s) corresponding to the handle key(s)

func (*RowChangedEvent) IsDelete

func (r *RowChangedEvent) IsDelete() bool

IsDelete returns true if the row is a delete event

func (*RowChangedEvent) IsInsert

func (r *RowChangedEvent) IsInsert() bool

IsInsert returns true if the row is an insert event

func (*RowChangedEvent) IsUpdate

func (r *RowChangedEvent) IsUpdate() bool

IsUpdate returns true if the row is an update event

func (*RowChangedEvent) PrimaryKeyColumnNames

func (r *RowChangedEvent) PrimaryKeyColumnNames() []string

PrimaryKeyColumnNames return all primary key's name

func (*RowChangedEvent) ToRedoLog

func (r *RowChangedEvent) ToRedoLog() *RedoLog

ToRedoLog converts row changed event to redo log

func (*RowChangedEvent) TrySplitAndSortUpdateEvent

func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string) error

TrySplitAndSortUpdateEvent do nothing

type RowChangedEventInRedoLog

type RowChangedEventInRedoLog struct {
	StartTs  uint64 `msg:"start-ts"`
	CommitTs uint64 `msg:"commit-ts"`

	// Table contains the table name and table ID.
	// NOTICE: We store the physical table ID here, not the logical table ID.
	Table *TableName `msg:"table"`

	Columns      []*Column `msg:"columns"`
	PreColumns   []*Column `msg:"pre-columns"`
	IndexColumns [][]int   `msg:"index-columns"`
}

RowChangedEventInRedoLog is used to store RowChangedEvent in redo log v2 format

func (*RowChangedEventInRedoLog) DecodeMsg

func (z *RowChangedEventInRedoLog) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*RowChangedEventInRedoLog) EncodeMsg

func (z *RowChangedEventInRedoLog) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*RowChangedEventInRedoLog) MarshalMsg

func (z *RowChangedEventInRedoLog) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*RowChangedEventInRedoLog) Msgsize

func (z *RowChangedEventInRedoLog) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*RowChangedEventInRedoLog) ToRowChangedEvent

func (r *RowChangedEventInRedoLog) ToRowChangedEvent() *RowChangedEvent

ToRowChangedEvent converts RowChangedEventInRedoLog to RowChangedEvent

func (*RowChangedEventInRedoLog) UnmarshalMsg

func (z *RowChangedEventInRedoLog) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type RunningError

type RunningError struct {
	Time    time.Time `json:"time"`
	Addr    string    `json:"addr"`
	Code    string    `json:"code"`
	Message string    `json:"message"`
}

RunningError represents some running error from cdc components, such as processor.

func (*RunningError) Scan

func (e *RunningError) Scan(value interface{}) error

Scan implements the sql.Scanner interface

func (RunningError) ShouldFailChangefeed

func (e RunningError) ShouldFailChangefeed() bool

ShouldFailChangefeed return true if a running error contains a changefeed not retry error.

func (RunningError) Value

func (e RunningError) Value() (driver.Value, error)

Value implements the driver.Valuer interface

type ServerStatus

type ServerStatus struct {
	Version   string   `json:"version"`
	GitHash   string   `json:"git_hash"`
	ID        string   `json:"id"`
	ClusterID string   `json:"cluster_id"`
	Pid       int      `json:"pid"`
	IsOwner   bool     `json:"is_owner"`
	Liveness  Liveness `json:"liveness"`
}

ServerStatus holds some common information of a server

type SingleTableTxn

type SingleTableTxn struct {
	PhysicalTableID int64
	TableInfo       *TableInfo
	// TableInfoVersion is the version of the table info, it is used to generate data path
	// in storage sink. Generally, TableInfoVersion equals to `SingleTableTxn.TableInfo.Version`.
	// Besides, if one table is just scheduled to a new processor, the TableInfoVersion should be
	// greater than or equal to the startTs of table sink.
	TableInfoVersion uint64

	StartTs  uint64
	CommitTs uint64
	Rows     []*RowChangedEvent
}

SingleTableTxn represents a transaction which includes many row events in a single table

func (*SingleTableTxn) Append

func (t *SingleTableTxn) Append(row *RowChangedEvent)

Append adds a row changed event into SingleTableTxn

func (*SingleTableTxn) GetCommitTs

func (t *SingleTableTxn) GetCommitTs() uint64

GetCommitTs returns the commit timestamp of the transaction.

func (*SingleTableTxn) GetPhysicalTableID

func (t *SingleTableTxn) GetPhysicalTableID() int64

GetPhysicalTableID returns the physical table id of the table in the transaction

func (*SingleTableTxn) TrySplitAndSortUpdateEvent

func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error

TrySplitAndSortUpdateEvent split update events if unique key is updated

type SortEngine

type SortEngine = string

SortEngine is the sorter engine

const (
	SortInMemory SortEngine = "memory"
	SortInFile   SortEngine = "file"
	SortUnified  SortEngine = "unified"
)

sort engines

type TableID

type TableID = tablepb.TableID

TableID is the ID of the table

type TableInfo

type TableInfo struct {
	*model.TableInfo
	SchemaID int64
	// NOTICE: We probably store the logical ID inside TableName,
	// not the physical ID.
	// For normal table, there is only one ID, which is the physical ID.
	// AKA TIDB_TABLE_ID.
	// For partitioned table, there are two kinds of ID:
	// 1. TIDB_PARTITION_ID is the physical ID of the partition.
	// 2. TIDB_TABLE_ID is the logical ID of the table.
	// In general, we always use the physical ID to represent a table, but we
	// record the logical ID from the DDL event(job.BinlogInfo.TableInfo).
	// So be careful when using the TableInfo.
	TableName TableName
	// Version record the tso of create the table info.
	Version uint64

	// ColumnID -> offset in RowChangedEvents.Columns.
	RowColumnsOffset map[int64]int

	ColumnsFlag map[int64]*ColumnFlagType

	// the mounter will choose this index to output delete events
	// special value:
	// HandleIndexPKIsHandle(-1) : pk is handle
	// HandleIndexTableIneligible(-2) : the table is not eligible
	HandleIndexID int64

	// IndexColumnsOffset store the offset of the columns in row changed events for
	// unique index and primary key
	// The reason why we need this is that the Indexes in TableInfo
	// will not contain the PK if it is create in statement like:
	// create table t (a int primary key, b int unique key);
	// Every element in first dimension is a index, and the second dimension is the columns offset
	// for example:
	// table has 3 columns: a, b, c
	// pk: a
	// index1: a, b
	// index2: a, c
	// indexColumnsOffset: [[0], [0, 1], [0, 2]]
	IndexColumnsOffset [][]int
	// contains filtered or unexported fields
}

TableInfo provides meta data describing a DB table.

func BuildTableInfo

func BuildTableInfo(schemaName, tableName string, columns []*Column, indexColumns [][]int) *TableInfo

BuildTableInfo builds a table info from given information. Note that some fields of the result TableInfo may just be mocked. The only guarantee is that we can use the result to reconstrut the information in `Column`. The main use cases of this function it to build TableInfo from redo log and in tests.

func BuildTableInfoWithPKNames4Test

func BuildTableInfoWithPKNames4Test(schemaName, tableName string, columns []*Column, pkNames map[string]struct{}) *TableInfo

BuildTableInfoWithPKNames4Test builds a table info from given information.

func WrapTableInfo

func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *model.TableInfo) *TableInfo

WrapTableInfo creates a TableInfo from a timodel.TableInfo

func (*TableInfo) Clone

func (ti *TableInfo) Clone() *TableInfo

Clone clones the TableInfo

func (*TableInfo) ForceGetColumnFlagType

func (ti *TableInfo) ForceGetColumnFlagType(colID int64) *ColumnFlagType

ForceGetColumnFlagType return the column flag type by ID Caller must ensure `colID` exists

func (*TableInfo) ForceGetColumnIDByName

func (ti *TableInfo) ForceGetColumnIDByName(name string) int64

ForceGetColumnIDByName return column ID by column name Caller must ensure `colID` exists

func (*TableInfo) ForceGetColumnInfo

func (ti *TableInfo) ForceGetColumnInfo(colID int64) *model.ColumnInfo

ForceGetColumnInfo return the column info by ID Caller must ensure `colID` exists

func (*TableInfo) ForceGetColumnName

func (ti *TableInfo) ForceGetColumnName(colID int64) string

ForceGetColumnName return the column name by ID Caller must ensure `colID` exists

func (*TableInfo) GetColInfosForRowChangedEvent

func (ti *TableInfo) GetColInfosForRowChangedEvent() []rowcodec.ColInfo

GetColInfosForRowChangedEvent return column infos for non-virtual columns The column order in the result is the same as the order in its corresponding RowChangedEvent

func (*TableInfo) GetColumnInfo

func (ti *TableInfo) GetColumnInfo(colID int64) (info *model.ColumnInfo, exist bool)

GetColumnInfo returns the column info by ID

func (*TableInfo) GetIndex

func (ti *TableInfo) GetIndex(name string) *model.IndexInfo

GetIndex return the corresponding index by the given name.

func (*TableInfo) GetPrimaryKeyColumnNames

func (ti *TableInfo) GetPrimaryKeyColumnNames() []string

GetPrimaryKeyColumnNames returns the primary key column names

func (*TableInfo) GetRowColInfos

func (ti *TableInfo) GetRowColInfos() ([]int64, map[int64]*types.FieldType, []rowcodec.ColInfo)

GetRowColInfos returns all column infos for rowcodec

func (*TableInfo) GetSchemaName

func (ti *TableInfo) GetSchemaName() string

GetSchemaName returns the schema name of the table

func (*TableInfo) GetSchemaNamePtr

func (ti *TableInfo) GetSchemaNamePtr() *string

GetSchemaNamePtr returns the pointer to the schema name of the table

func (*TableInfo) GetTableName

func (ti *TableInfo) GetTableName() string

GetTableName returns the table name of the table

func (*TableInfo) GetTableNamePtr

func (ti *TableInfo) GetTableNamePtr() *string

GetTableNamePtr returns the pointer to the table name of the table

func (*TableInfo) HasUniqueColumn

func (ti *TableInfo) HasUniqueColumn() bool

HasUniqueColumn returns whether the table has a unique column

func (*TableInfo) HasVirtualColumns

func (ti *TableInfo) HasVirtualColumns() bool

HasVirtualColumns returns whether the table has virtual columns

func (*TableInfo) IndexByName

func (ti *TableInfo) IndexByName(name string) ([]string, []int, bool)

IndexByName returns the index columns and offsets of the corresponding index by name

func (*TableInfo) IsEligible

func (ti *TableInfo) IsEligible(forceReplicate bool) bool

IsEligible returns whether the table is a eligible table

func (*TableInfo) IsIndexUnique

func (ti *TableInfo) IsIndexUnique(indexInfo *model.IndexInfo) bool

IsIndexUnique returns whether the index is unique

func (*TableInfo) IsPartitionTable

func (ti *TableInfo) IsPartitionTable() bool

IsPartitionTable returns whether the table is partition table

func (*TableInfo) OffsetsByNames

func (ti *TableInfo) OffsetsByNames(names []string) ([]int, bool)

OffsetsByNames returns the column offsets of the corresponding columns by names If any column does not exist, return false

func (*TableInfo) String

func (ti *TableInfo) String() string

type TableName

type TableName struct {
	Schema      string `toml:"db-name" msg:"db-name"`
	Table       string `toml:"tbl-name" msg:"tbl-name"`
	TableID     int64  `toml:"tbl-id" msg:"tbl-id"`
	IsPartition bool   `toml:"is-partition" msg:"is-partition"`
}

TableName represents name of a table, includes table name and schema name.

func (*TableName) DecodeMsg

func (z *TableName) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*TableName) EncodeMsg

func (z *TableName) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*TableName) GetSchema

func (t *TableName) GetSchema() string

GetSchema returns schema name.

func (*TableName) GetTable

func (t *TableName) GetTable() string

GetTable returns table name.

func (*TableName) GetTableID

func (t *TableName) GetTableID() int64

GetTableID returns table ID.

func (*TableName) MarshalMsg

func (z *TableName) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*TableName) Msgsize

func (z *TableName) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (TableName) QuoteString

func (t TableName) QuoteString() string

QuoteString returns quoted full table name

func (TableName) String

func (t TableName) String() string

String implements fmt.Stringer interface.

func (*TableName) UnmarshalMsg

func (z *TableName) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type TableOperation

type TableOperation struct {
	Delete bool   `json:"delete"`
	Flag   uint64 `json:"flag,omitempty"`
	// if the operation is a delete operation, BoundaryTs is checkpoint ts
	// if the operation is an add operation, BoundaryTs is start ts
	BoundaryTs uint64 `json:"boundary_ts"`
	Status     uint64 `json:"status,omitempty"`
}

TableOperation records the current information of a table migration

func (*TableOperation) Clone

func (o *TableOperation) Clone() *TableOperation

Clone returns a deep-clone of the struct

func (*TableOperation) TableApplied

func (o *TableOperation) TableApplied() bool

TableApplied returns whether the table has finished the startup procedure. Returns true if table has been processed by processor and resolved ts reaches global resolved ts.

func (*TableOperation) TableProcessed

func (o *TableOperation) TableProcessed() bool

TableProcessed returns whether the table has been processed by processor

type TableReplicaInfo

type TableReplicaInfo struct {
	StartTs Ts `json:"start-ts"`
}

TableReplicaInfo records the table replica info

func (*TableReplicaInfo) Clone

func (i *TableReplicaInfo) Clone() *TableReplicaInfo

Clone clones a TableReplicaInfo

type TaskPosition

type TaskPosition struct {
	// The maximum event CommitTs that has been synchronized. This is updated by corresponding processor.
	//
	// Deprecated: only used in API. TODO: remove API usage.
	CheckPointTs uint64 `json:"checkpoint-ts"`
	// The event that satisfies CommitTs <= ResolvedTs can be synchronized. This is updated by corresponding processor.
	//
	// Deprecated: only used in API. TODO: remove API usage.
	ResolvedTs uint64 `json:"resolved-ts"`
	// The count of events were synchronized. This is updated by corresponding processor.
	//
	// Deprecated: only used in API. TODO: remove API usage.
	Count uint64 `json:"count"`

	// Error when changefeed error happens
	Error *RunningError `json:"error"`
	// Warning when module error happens
	Warning *RunningError `json:"warning"`
}

TaskPosition records the process information of a capture

func (*TaskPosition) Clone

func (tp *TaskPosition) Clone() *TaskPosition

Clone returns a deep clone of TaskPosition

func (*TaskPosition) Marshal

func (tp *TaskPosition) Marshal() (string, error)

Marshal returns the json marshal format of a TaskStatus

func (*TaskPosition) String

func (tp *TaskPosition) String() string

String implements fmt.Stringer interface.

func (*TaskPosition) Unmarshal

func (tp *TaskPosition) Unmarshal(data []byte) error

Unmarshal unmarshals into *TaskStatus from json marshal byte slice

type TaskStatus deprecated

type TaskStatus struct {
	Tables       map[TableID]*TableReplicaInfo `json:"tables"`
	Operation    map[TableID]*TableOperation   `json:"operation"`
	AdminJobType AdminJobType                  `json:"admin-job-type"`
	ModRevision  int64                         `json:"-"`
}

TaskStatus records the task information of a capture.

Deprecated: only used in API. TODO: remove API usage.

func (*TaskStatus) Clone

func (ts *TaskStatus) Clone() *TaskStatus

Clone returns a deep-clone of the struct

func (*TaskStatus) Marshal

func (ts *TaskStatus) Marshal() (string, error)

Marshal returns the json marshal format of a TaskStatus

func (*TaskStatus) String

func (ts *TaskStatus) String() string

String implements fmt.Stringer interface.

func (*TaskStatus) Unmarshal

func (ts *TaskStatus) Unmarshal(data []byte) error

Unmarshal unmarshals into *TaskStatus from json marshal byte slice

type TopicPartitionKey

type TopicPartitionKey struct {
	Topic          string
	Partition      int32
	PartitionKey   string
	TotalPartition int32
}

TopicPartitionKey contains the topic and partition key of the message.

func (*TopicPartitionKey) DecodeMsg

func (z *TopicPartitionKey) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*TopicPartitionKey) EncodeMsg

func (z *TopicPartitionKey) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*TopicPartitionKey) MarshalMsg

func (z *TopicPartitionKey) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*TopicPartitionKey) Msgsize

func (z *TopicPartitionKey) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*TopicPartitionKey) UnmarshalMsg

func (z *TopicPartitionKey) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type Ts

type Ts = tablepb.Ts

Ts is the timestamp with a logical count

type UpstreamID

type UpstreamID = uint64

UpstreamID is the type for upstream ID

type UpstreamInfo

type UpstreamInfo struct {
	ID            uint64   `json:"id"`
	PDEndpoints   string   `json:"pd-endpoints"`
	KeyPath       string   `json:"key-path"`
	CertPath      string   `json:"cert-path"`
	CAPath        string   `json:"ca-path"`
	CertAllowedCN []string `json:"cert-allowed-cn"`
}

UpstreamInfo store in etcd.

func (*UpstreamInfo) Clone

func (c *UpstreamInfo) Clone() (*UpstreamInfo, error)

Clone returns a cloned upstreamInfo

func (*UpstreamInfo) Marshal

func (c *UpstreamInfo) Marshal() ([]byte, error)

Marshal using json.Marshal.

func (*UpstreamInfo) Unmarshal

func (c *UpstreamInfo) Unmarshal(data []byte) error

Unmarshal from binary data.

Directories

Path Synopsis
v1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL