Documentation ¶
Index ¶
- func BuildInsertQuery(query *bytes.Buffer, tableName string, columns []string)
- func NewRepository(conn *sql.DB, metrics RepositoryMetrics, logger *zap.Logger) *repository
- type Query
- type Repository
- type RepositoryMetrics
- type RepositoryMock
- func (r *RepositoryMock) FetchColumnNames(ctx context.Context, tableName string) ([]string, error)
- func (r *RepositoryMock) FetchTable(ctx context.Context, tableName string, indexFields []string, signField string) (TableState, []string, error)
- func (r *RepositoryMock) Insert(ctx context.Context, query string, rows []storageRow, tableName string) (int, error)
- type TableBuffer
- func (tb *TableBuffer) Close()
- func (tb *TableBuffer) Delete(queries ...Query) error
- func (tb *TableBuffer) Errors() <-chan error
- func (tb *TableBuffer) Flush()
- func (tb *TableBuffer) Insert(queries ...Query) error
- func (tb *TableBuffer) InsertOrUpdate(queries ...Query) error
- func (tb *TableBuffer) Log(queries ...Query) error
- func (tb *TableBuffer) SetColumnDictionary(columns []string)
- func (tb *TableBuffer) SetStorage(storage TableState)
- func (tb *TableBuffer) Update(queries ...Query) error
- type TableBufferMetrics
- type TableMeta
- type TableState
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildInsertQuery ¶
func NewRepository ¶
func NewRepository(conn *sql.DB, metrics RepositoryMetrics, logger *zap.Logger) *repository
Types ¶
type Query ¶
type Query map[string]interface{} // columnName->data
Query for modifying the data. updates Storage in RAM and makes 1 or 2 clickhouse inserts.
type Repository ¶
type Repository interface { Insert(ctx context.Context, query string, rows []storageRow, tableName string) (int, error) FetchTable( ctx context.Context, tableName string, indexFields []string, signField string, ) ( TableState, []string, error, ) FetchColumnNames(ctx context.Context, tableName string) ([]string, error) }
type RepositoryMetrics ¶
type RepositoryMock ¶
type RepositoryMock struct {
// contains filtered or unexported fields
}
func NewRepositoryMock ¶
func NewRepositoryMock() *RepositoryMock
func (*RepositoryMock) FetchColumnNames ¶
func (*RepositoryMock) FetchTable ¶
func (r *RepositoryMock) FetchTable(ctx context.Context, tableName string, indexFields []string, signField string) (TableState, []string, error)
type TableBuffer ¶
type TableBuffer struct {
// contains filtered or unexported fields
}
TableBuffer contains the actual state of CollapsingMergeTree table, batches clickhouse inserts.
func NewTableBuffer ¶
func NewTableBuffer( rowsFlushPeriod time.Duration, bufferSize uint64, tableName string, signColumnName string, repo Repository, insertTimeout time.Duration, indexes []string, metrics TableBufferMetrics, logger *zap.Logger, ) *TableBuffer
func (*TableBuffer) Close ¶
func (tb *TableBuffer) Close()
func (*TableBuffer) Delete ¶
func (tb *TableBuffer) Delete(queries ...Query) error
Delete deletes rows from memory storage, insert cancelling row to mainBuffer returns the error if such primary key is missed.
func (*TableBuffer) Errors ¶
func (tb *TableBuffer) Errors() <-chan error
func (*TableBuffer) Flush ¶
func (tb *TableBuffer) Flush()
func (*TableBuffer) Insert ¶
func (tb *TableBuffer) Insert(queries ...Query) error
Insert inserts queries to mainBuffer, with storage state updating.
func (*TableBuffer) InsertOrUpdate ¶
func (tb *TableBuffer) InsertOrUpdate(queries ...Query) error
InsertOrUpdate inserts rows to memory storage, updates the row if such primary key is already presented.
func (*TableBuffer) Log ¶
func (tb *TableBuffer) Log(queries ...Query) error
Log inserts a simple rows to mainBuffer, doesn't update a storage state.
func (*TableBuffer) SetColumnDictionary ¶
func (tb *TableBuffer) SetColumnDictionary(columns []string)
SetColumnDictionary sets the set of columns for building insert queries.
func (*TableBuffer) SetStorage ¶
func (tb *TableBuffer) SetStorage(storage TableState)
SetStorage sets the state of CollapsingMergeTree table.
func (*TableBuffer) Update ¶
func (tb *TableBuffer) Update(queries ...Query) error
Update updates rows in memory storage, insert cancelling and new row to mainBuffer returns the error if such primary key is missed.