extractor

package
v0.2.1-0...-d023f04 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2023 License: MPL-2.0 Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OperationCodeInsert   = 1
	OperationCodeDelete   = 2
	OperationCodeUpdate   = 3
	OperationCodeDDL      = 5
	OperationCodeStart    = 6
	OperationCodeCommit   = 7
	OperationCodeMissScn  = 34
	OperationCodeRollback = 36
)
View Source
const (
	NullValue              = "NULL"
	EmptyCLOBFunction      = "EMPTY_CLOB()"
	EmptyBLOBFunction      = "EMPTY_BLOB()"
	FunctionHEXTORAWStart  = `HEXTORAW(`
	CommonFunctionEnd      = `)`
	InfValue               = `Inf`
	NInfValue              = `-Inf`
	NanValue               = `Nan`
	ToDSintervalStart      = "TO_DSINTERVAL("
	ToYMintervalStart      = "TO_YMINTERVAL("
	FunctionUNITSTRStart   = "UNISTR("
	ToDateFuncStart        = "TO_DATE("
	ToDateFuncEnd          = ", SYYYY-MM-DD HH24:MI:SS)"
	ToTimestampFuncStart   = "TO_TIMESTAMP("
	ToTimestampTzFuncStart = "TO_TIMESTAMP_TZ("
)

Variables

View Source
var CONCATENATIONPATTERN = "\\|\\|"

Functions

func HandlingForSpecialCharacters

func HandlingForSpecialCharacters(i *oracle_element.Identifier) string

func IdentifierToString

func IdentifierToString(i *oracle_element.Identifier) string

func LimitSize

func LimitSize(dec int) int

MySQL DEC/DECIMAL/NUMERIC type max scale is 30

func NewDumper

func NewDumper(ctx context.Context, db *config.OracleDB, table *common.Table, chunkSize int64,
	logger g.LoggerType, memory *int64, scn int64) *dumper

func Query

func Query(db *gosql.DB, querySQL string, args ...interface{}) ([]map[string]string, error)

func ReplaceQuotesString

func ReplaceQuotesString(s string) string

替换字符串引号字符

func ReplaceSpecifiedString

func ReplaceSpecifiedString(s string, oldStr, newStr string) string

替换指定字符

func StringsBuilder

func StringsBuilder(str ...string) string

字符串拼接

func UnitstrConvert

func UnitstrConvert(data string) string

func UnitstrDecode

func UnitstrDecode(value string) string

Types

type ColumnDefinition

type ColumnDefinition struct {
	Name          string
	Datatype      string
	DataLength    uint32
	DataPrecision sql.NullInt32
	DataScale     sql.NullInt32
	DefaultLength sql.NullInt32
	CharLength    sql.NullInt32
	Nullable      OracleBoolean
	DataDefault   sql.NullString
}

func (*ColumnDefinition) String

func (c *ColumnDefinition) String() string

type ExtractorOracle

type ExtractorOracle struct {
	MySQLVersion          string
	TotalTransferredBytes int
	// Original comment: TotalRowsCopied returns the accurate number of rows being copied (affected)
	// This is not exactly the same as the rows being iterated via chunks, but potentially close enough.
	// TODO What is the difference between mysqlContext.RowsEstimate ?
	TotalRowsCopied int64

	LogMinerStream *LogMinerStream

	OracleContext *OracleContext
	// contains filtered or unexported fields
}

ExtractorOracle is the main schema extract flow manager.

func NewExtractorOracle

func NewExtractorOracle(execCtx *common.ExecContext, cfg *common.MySQLDriverConfig, logger g.LoggerType, storeManager *common.StoreManager, waitCh chan *drivers.ExitResult, ctx context.Context) (*ExtractorOracle, error)

func (*ExtractorOracle) CheckAndApplyLowerCaseTableNames

func (e *ExtractorOracle) CheckAndApplyLowerCaseTableNames()

func (*ExtractorOracle) DataStreamEvents

func (e *ExtractorOracle) DataStreamEvents(entriesChannel chan<- *common.EntryContext) error

func (*ExtractorOracle) Finish1

func (e *ExtractorOracle) Finish1() (err error)

func (*ExtractorOracle) GetLogMinerRecord

func (e *ExtractorOracle) GetLogMinerRecord(startScn, endScn int64, records chan *LogMinerRecord) error

func (*ExtractorOracle) LockTablesForSchema

func (e *ExtractorOracle) LockTablesForSchema(tx *sql.Tx, ctx context.Context) error

func (*ExtractorOracle) LoopLogminerRecord

func (e *ExtractorOracle) LoopLogminerRecord() error

func (*ExtractorOracle) Run

func (e *ExtractorOracle) Run()

Run executes the complete extract logic.

func (*ExtractorOracle) Shutdown

func (e *ExtractorOracle) Shutdown() error

Shutdown is used to tear down the ExtractorOracle

func (*ExtractorOracle) Stats

func (e *ExtractorOracle) Stats() (*common.TaskStatistics, error)

func (*ExtractorOracle) StreamEvents

func (e *ExtractorOracle) StreamEvents() error

StreamEvents will begin streaming events. It will be blocking, so should be executed by a goroutine

type LogFile

type LogFile struct {
	Name        string
	FirstChange int64
}

type LogMinerRecord

type LogMinerRecord struct {
	SCN       int64
	SegOwner  string
	TableName string
	SQLRedo   string
	SQLUndo   string
	Operation int
	XId       []byte
	Csf       int
	RowId     string
	Rollback  int
	RsId      string
	StartTime string
	Username  string
}

func (*LogMinerRecord) String

func (r *LogMinerRecord) String() string

func (*LogMinerRecord) TxId

func (r *LogMinerRecord) TxId() string

type LogMinerStream

type LogMinerStream struct {
	OracleTxNum uint32
	// contains filtered or unexported fields
}

func NewLogMinerStream

func NewLogMinerStream(ctx context.Context, db *config.OracleDB, logger g.LoggerType, replicateDB, ignoreReplicateDB []*common.DataSource, startScn, committedScn, interval int64) *LogMinerStream

func (*LogMinerStream) AddLogMinerFile

func (l *LogMinerStream) AddLogMinerFile(fs []*LogFile) error

func (*LogMinerStream) BuildLogMiner

func (l *LogMinerStream) BuildLogMiner() error

func (*LogMinerStream) EndLogMiner

func (l *LogMinerStream) EndLogMiner() error

func (*LogMinerStream) GetLogFileBySCN

func (l *LogMinerStream) GetLogFileBySCN(scn int64) ([]*LogFile, error)

func (*LogMinerStream) StartLogMinerBySCN

func (l *LogMinerStream) StartLogMinerBySCN(scn int64) error

func (*LogMinerStream) StartLogMinerBySCN2

func (l *LogMinerStream) StartLogMinerBySCN2(startScn, endScn int64) error

type LogMinerTx

type LogMinerTx struct {
	// contains filtered or unexported fields
}

func (*LogMinerTx) String

func (l *LogMinerTx) String() string

type LogMinerTxCache

type LogMinerTxCache struct {
	Handler func(tx *LogMinerTx) error
	// contains filtered or unexported fields
}

func NewLogMinerTxCache

func NewLogMinerTxCache() *LogMinerTxCache

type OracleBoolean

type OracleBoolean bool

func (*OracleBoolean) Scan

func (o *OracleBoolean) Scan(value interface{}) error

type OracleContext

type OracleContext struct {
	// contains filtered or unexported fields
}

type OracleSchemaInfo

type OracleSchemaInfo struct {
	Tables map[string]*ast.CreateTableStmt
}

type Stmt

type Stmt struct {
	Schema            string
	Table             string
	Columns           []string
	Operation         int8
	Data              map[string]interface{}
	Before            map[string]interface{}
	WhereExpr         string
	WhereColumnValues []interface{}
	NewColumnValues   []interface{}
	Error             error
	// contains filtered or unexported fields
}

func (*Stmt) Enter

func (v *Stmt) Enter(in ast.Node) (ast.Node, bool)

func (*Stmt) Leave

func (v *Stmt) Leave(in ast.Node) (ast.Node, bool)

func (*Stmt) Marshal

func (v *Stmt) Marshal() string

type TimestampContext

type TimestampContext struct {

	// Do not pass 0 to the chan.
	TimestampCh chan uint32
	// contains filtered or unexported fields
}

func NewTimestampContext

func NewTimestampContext(stopCh chan struct{}, logger g.LoggerType, emptyQueueFunc func() bool) *TimestampContext

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL