database

package
v0.0.0-...-f0e227c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2024 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CantPerformQuery

func CantPerformQuery(err error, q string) error

CantPerformQuery wraps the given error with the specified query that cannot be executed.

func SplitOnDupId

func SplitOnDupId[T IDer]() com.BulkChunkSplitPolicy[T]

SplitOnDupId returns a state machine which tracks the inputs' IDs. Once an already seen input arrives, it demands splitting.

func TableName

func TableName(t interface{}) string

TableName returns the table of t.

Types

type CleanupStmt

type CleanupStmt struct {
	Table  string
	PK     string
	Column string
}

CleanupStmt defines information needed to compose cleanup statements.

func (*CleanupStmt) Build

func (stmt *CleanupStmt) Build(driverName string, limit uint64) string

Build assembles the cleanup statement for the specified database driver with the given limit.

type ColumnMap

type ColumnMap interface {
	// Columns returns database column names for a struct's exported fields in a cached manner.
	// Thus, the returned slice MUST NOT be modified directly.
	// By default, all exported struct fields are mapped to database column names using snake case notation.
	// The - (hyphen) directive for the db tag can be used to exclude certain fields.
	Columns(any) []string
}

ColumnMap provides a cached mapping of structs exported fields to their database column names.

func NewColumnMap

func NewColumnMap(mapper *reflectx.Mapper) ColumnMap

NewColumnMap returns a new ColumnMap.

type Config

type Config struct {
	Type       string     `yaml:"type" default:"mysql"`
	Host       string     `yaml:"host"`
	Port       int        `yaml:"port"`
	Database   string     `yaml:"database"`
	User       string     `yaml:"user"`
	Password   string     `yaml:"password"`
	TlsOptions config.TLS `yaml:",inline"`
	Options    Options    `yaml:"options"`
}

Config defines database client configuration.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks constraints in the supplied database configuration and returns an error if they are violated.

type DB

type DB struct {
	*sqlx.DB

	Options *Options
	// contains filtered or unexported fields
}

DB is a wrapper around sqlx.DB with bulk execution, statement building, streaming and logging capabilities.

func NewDb

func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB

NewDb returns a new DB wrapper for a pre-existing sqlx.DB.

func NewDbFromConfig

func NewDbFromConfig(c *Config, logger *logging.Logger) (*DB, error)

NewDbFromConfig returns a new DB from Config.

func (*DB) BatchSizeByPlaceholders

func (db *DB) BatchSizeByPlaceholders(n int) int

BatchSizeByPlaceholders returns how often the specified number of placeholders fits into Options.MaxPlaceholdersPerStatement, but at least 1.

func (*DB) BuildColumns

func (db *DB) BuildColumns(subject interface{}) []string

BuildColumns returns all columns of the given struct.

func (*DB) BuildDeleteStmt

func (db *DB) BuildDeleteStmt(from interface{}) string

BuildDeleteStmt returns a DELETE statement for the given struct.

func (*DB) BuildInsertIgnoreStmt

func (db *DB) BuildInsertIgnoreStmt(into interface{}) (string, int)

BuildInsertIgnoreStmt returns an INSERT statement for the specified struct for which the database ignores rows that have already been inserted.

func (*DB) BuildInsertStmt

func (db *DB) BuildInsertStmt(into interface{}) (string, int)

BuildInsertStmt returns an INSERT INTO statement for the given struct.

func (*DB) BuildSelectStmt

func (db *DB) BuildSelectStmt(table interface{}, columns interface{}) string

BuildSelectStmt returns a SELECT query that creates the FROM part from the given table struct and the column list from the specified columns struct.

func (*DB) BuildUpdateStmt

func (db *DB) BuildUpdateStmt(update interface{}) (string, int)

BuildUpdateStmt returns an UPDATE statement for the given struct.

func (*DB) BuildUpsertStmt

func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int)

BuildUpsertStmt returns an upsert statement for the given struct.

func (*DB) BuildWhere

func (db *DB) BuildWhere(subject interface{}) (string, int)

BuildWhere returns a WHERE clause with named placeholder conditions built from the specified struct combined with the AND operator.

func (*DB) BulkExec

func (db *DB) BulkExec(
	ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan any, onSuccess ...OnSuccess[any],
) error

BulkExec bulk executes queries with a single slice placeholder in the form of `IN (?)`. Takes in up to the number of arguments specified in count from the arg stream, derives and expands a query and executes it with this set of arguments until the arg stream has been processed. The derived queries are executed in a separate goroutine with a weighting of 1 and can be executed concurrently to the extent allowed by the semaphore passed in sem. Arguments for which the query ran successfully will be passed to onSuccess.

func (*DB) CleanupOlderThan

func (db *DB) CleanupOlderThan(
	ctx context.Context, stmt CleanupStmt, envId types.Binary,
	count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}],
) (uint64, error)

CleanupOlderThan deletes all rows with the specified statement that are older than the given time. Deletes a maximum of as many rows per round as defined in count. Actually deleted rows will be passed to onSuccess. Returns the total number of rows deleted.

func (*DB) CreateIgnoreStreamed

func (db *DB) CreateIgnoreStreamed(
	ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity],
) error

CreateIgnoreStreamed bulk creates the specified entities via NamedBulkExec. The insert statement is created using BuildInsertIgnoreStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. Entities for which the query ran successfully will be passed to onSuccess.

func (*DB) CreateStreamed

func (db *DB) CreateStreamed(
	ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity],
) error

CreateStreamed bulk creates the specified entities via NamedBulkExec. The insert statement is created using BuildInsertStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. Entities for which the query ran successfully will be passed to onSuccess.

func (*DB) Delete

func (db *DB) Delete(
	ctx context.Context, entityType Entity, ids []interface{}, onSuccess ...OnSuccess[any],
) error

Delete creates a channel from the specified ids and bulk deletes them by passing the channel along with the entityType to DeleteStreamed. IDs for which the query ran successfully will be passed to onSuccess.

func (*DB) DeleteStreamed

func (db *DB) DeleteStreamed(
	ctx context.Context, entityType Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any],
) error

DeleteStreamed bulk deletes the specified ids via BulkExec. The delete statement is created using BuildDeleteStmt with the passed entityType. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. IDs for which the query ran successfully will be passed to onSuccess.

func (*DB) GetSemaphoreForTable

func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted

func (*DB) NamedBulkExec

func (db *DB) NamedBulkExec(
	ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan Entity,
	splitPolicyFactory com.BulkChunkSplitPolicyFactory[Entity], onSuccess ...OnSuccess[Entity],
) error

NamedBulkExec bulk executes queries with named placeholders in a VALUES clause most likely in the format INSERT ... VALUES. Takes in up to the number of entities specified in count from the arg stream, derives and executes a new query with the VALUES clause expanded to this set of arguments, until the arg stream has been processed. The queries are executed in a separate goroutine with a weighting of 1 and can be executed concurrently to the extent allowed by the semaphore passed in sem. Entities for which the query ran successfully will be passed to onSuccess.

func (*DB) NamedBulkExecTx

func (db *DB) NamedBulkExecTx(
	ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan Entity,
) error

NamedBulkExecTx bulk executes queries with named placeholders in separate transactions. Takes in up to the number of entities specified in count from the arg stream and executes a new transaction that runs a new query for each entity in this set of arguments, until the arg stream has been processed. The transactions are executed in a separate goroutine with a weighting of 1 and can be executed concurrently to the extent allowed by the semaphore passed in sem.

func (*DB) UpdateStreamed

func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan Entity) error

UpdateStreamed bulk updates the specified entities via NamedBulkExecTx. The update statement is created using BuildUpdateStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxRowsPerTransaction and concurrency is controlled via Options.MaxConnectionsPerTable.

func (*DB) UpsertStreamed

func (db *DB) UpsertStreamed(
	ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity],
) error

UpsertStreamed bulk upserts the specified entities via NamedBulkExec. The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. Entities for which the query ran successfully will be passed to onSuccess.

func (*DB) YieldAll

func (db *DB) YieldAll(ctx context.Context, factoryFunc EntityFactoryFunc, query string, scope interface{}) (<-chan Entity, <-chan error)

YieldAll executes the query with the supplied scope, scans each resulting row into an entity returned by the factory function, and streams them into a returned channel.

type Entity

type Entity interface {
	Fingerprinter
	IDer
}

Entity is implemented by each type that works with the database package.

type EntityFactoryFunc

type EntityFactoryFunc func() Entity

EntityFactoryFunc knows how to create an Entity.

type Fingerprinter

type Fingerprinter interface {
	// Fingerprint returns the value that uniquely identifies the entity.
	Fingerprint() Fingerprinter
}

Fingerprinter is implemented by every entity that uniquely identifies itself.

type ID

type ID interface {
	// String returns the string representation form of the ID.
	// The String method is used to use the ID in functions
	// where it needs to be compared or hashed.
	String() string
}

ID is a unique identifier of an entity.

type IDer

type IDer interface {
	ID() ID   // ID returns the ID.
	SetID(ID) // SetID sets the ID.
}

IDer is implemented by every entity that uniquely identifies itself.

type OnSuccess

type OnSuccess[T any] func(ctx context.Context, affectedRows []T) (err error)

OnSuccess is a callback for successful (bulk) DML operations.

func OnSuccessIncrement

func OnSuccessIncrement[T any](counter *com.Counter) OnSuccess[T]

func OnSuccessSendTo

func OnSuccessSendTo[T any](ch chan<- T) OnSuccess[T]

type Options

type Options struct {
	// Maximum number of open connections to the database.
	MaxConnections int `yaml:"max_connections" default:"16"`

	// Maximum number of connections per table,
	// regardless of what the connection is actually doing,
	// e.g. INSERT, UPDATE, DELETE.
	MaxConnectionsPerTable int `yaml:"max_connections_per_table" default:"8"`

	// MaxPlaceholdersPerStatement defines the maximum number of placeholders in an
	// INSERT, UPDATE or DELETE statement. Theoretically, MySQL can handle up to 2^16-1 placeholders,
	// but this increases the execution time of queries and thus reduces the number of queries
	// that can be executed in parallel in a given time.
	// The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism.
	MaxPlaceholdersPerStatement int `yaml:"max_placeholders_per_statement" default:"8192"`

	// MaxRowsPerTransaction defines the maximum number of rows per transaction.
	// The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism.
	MaxRowsPerTransaction int `yaml:"max_rows_per_transaction" default:"8192"`
}

Options define user configurable database options.

func (*Options) Validate

func (o *Options) Validate() error

Validate checks constraints in the supplied database options and returns an error if they are violated.

type PgsqlOnConflictConstrainter

type PgsqlOnConflictConstrainter interface {
	// PgsqlOnConflictConstraint returns the primary or unique key constraint name of the PostgreSQL table.
	PgsqlOnConflictConstraint() string
}

PgsqlOnConflictConstrainter implements the PgsqlOnConflictConstraint method, which returns the primary or unique key constraint name of the PostgreSQL table.

type Scoper

type Scoper interface {
	Scope() any
}

Scoper implements the Scope method, which returns a struct specifying the WHERE conditions that entities must satisfy in order to be SELECTed.

type TableNamer

type TableNamer interface {
	TableName() string // TableName tells the table.
}

TableNamer implements the TableName method, which returns the table of the object.

type Upserter

type Upserter interface {
	Upsert() any // Upsert partitions the object.
}

Upserter implements the Upsert method, which returns a part of the object for ON DUPLICATE KEY UPDATE.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL