table_buffer

package
v0.0.0-...-4512317 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2022 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildInsertQuery

func BuildInsertQuery(query *bytes.Buffer, tableName string, columns []string)

func NewRepository

func NewRepository(conn *sql.DB, metrics RepositoryMetrics, logger *zap.Logger) *repository

Types

type Query

type Query map[string]interface{} // columnName->data

Query for modifying the data. updates Storage in RAM and makes 1 or 2 clickhouse inserts.

type Repository

type Repository interface {
	Insert(ctx context.Context, query string, rows []storageRow, tableName string) (int, error)
	FetchTable(
		ctx context.Context,
		tableName string,
		indexFields []string,
		signField string,
	) (
		TableState,
		[]string,
		error,
	)
	FetchColumnNames(ctx context.Context, tableName string) ([]string, error)
}

type RepositoryMetrics

type RepositoryMetrics interface {
	ObserveDBLatency(tableName string, latency float64)
}

type RepositoryMock

type RepositoryMock struct {
	// contains filtered or unexported fields
}

func NewRepositoryMock

func NewRepositoryMock() *RepositoryMock

func (*RepositoryMock) FetchColumnNames

func (r *RepositoryMock) FetchColumnNames(ctx context.Context, tableName string) ([]string, error)

func (*RepositoryMock) FetchTable

func (r *RepositoryMock) FetchTable(ctx context.Context, tableName string, indexFields []string, signField string) (TableState, []string, error)

func (*RepositoryMock) Insert

func (r *RepositoryMock) Insert(ctx context.Context, query string, rows []storageRow, tableName string) (int, error)

type TableBuffer

type TableBuffer struct {
	// contains filtered or unexported fields
}

TableBuffer contains the actual state of CollapsingMergeTree table, batches clickhouse inserts.

func NewTableBuffer

func NewTableBuffer(
	rowsFlushPeriod time.Duration,
	bufferSize uint64,
	tableName string,
	signColumnName string,
	repo Repository,
	insertTimeout time.Duration,
	indexes []string,
	metrics TableBufferMetrics,
	logger *zap.Logger,
) *TableBuffer

func (*TableBuffer) Close

func (tb *TableBuffer) Close()

func (*TableBuffer) Delete

func (tb *TableBuffer) Delete(queries ...Query) error

Delete deletes rows from memory storage, insert cancelling row to mainBuffer returns the error if such primary key is missed.

func (*TableBuffer) Errors

func (tb *TableBuffer) Errors() <-chan error

func (*TableBuffer) Flush

func (tb *TableBuffer) Flush()

func (*TableBuffer) Insert

func (tb *TableBuffer) Insert(queries ...Query) error

Insert inserts queries to mainBuffer, with storage state updating.

func (*TableBuffer) InsertOrUpdate

func (tb *TableBuffer) InsertOrUpdate(queries ...Query) error

InsertOrUpdate inserts rows to memory storage, updates the row if such primary key is already presented.

func (*TableBuffer) Log

func (tb *TableBuffer) Log(queries ...Query) error

Log inserts a simple rows to mainBuffer, doesn't update a storage state.

func (*TableBuffer) SetColumnDictionary

func (tb *TableBuffer) SetColumnDictionary(columns []string)

SetColumnDictionary sets the set of columns for building insert queries.

func (*TableBuffer) SetStorage

func (tb *TableBuffer) SetStorage(storage TableState)

SetStorage sets the state of CollapsingMergeTree table.

func (*TableBuffer) Update

func (tb *TableBuffer) Update(queries ...Query) error

Update updates rows in memory storage, insert cancelling and new row to mainBuffer returns the error if such primary key is missed.

type TableBufferMetrics

type TableBufferMetrics interface {
	ObserveBufferFlushLatency(tableName string, latency float64)
	IncBufferInsertCounter(tableName string)
	SetBufferLen(tableName string, bufferLen int64)
}

type TableMeta

type TableMeta struct {
	Offset int64
	Lsn    uint64
}

type TableState

type TableState map[string]map[string]interface{} // pk->column->data

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL