database

package
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2021 License: GPL-3.0 Imports: 44 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// UseBulkExportFlowCSV to use BulkExportFlowCSV
	UseBulkExportFlowCSV = false

	SampleSize = 900
)
View Source
var Debug = false

Debug prints queries when true

Functions

func CleanSQL

func CleanSQL(conn Connection, sql string) string

CleanSQL removes creds from the query

func CommonColumns

func CommonColumns(colNames1 []string, colNames2 []string) (commCols []string)

CommonColumns return common columns

func CopyFromAzure

func CopyFromAzure(conn Connection, tableFName, azPath string) (err error)

CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func CopyFromS3

func CopyFromS3(conn Connection, tableFName, s3Path string) (err error)

func GetSlingEnv

func GetSlingEnv() map[string]string

GetSlingEnv return sling Env Data

func InsertBatchStream added in v0.0.5

func InsertBatchStream(conn Connection, tx *Transaction, tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertBatchStream inserts a stream into a table in batch

func InsertStream added in v0.0.5

func InsertStream(conn Connection, tx *Transaction, tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertStream inserts a stream

func PK added in v0.0.5

func PK(obj interface{}) (pk []string)

PK returns the primary keys of a model

func ParseSQLMultiStatements added in v0.0.5

func ParseSQLMultiStatements(sql string) (sqls g.Strings)

ParseSQLMultiStatements splits a sql text into statements typically by a ';'

func SQLColumns

func SQLColumns(colTypes []*sql.ColumnType, NativeTypeMap map[string]string) (columns iop.Columns)

SQLColumns returns the columns from database ColumnType

func SplitTableFullName

func SplitTableFullName(tableName string) (string, string)

SplitTableFullName retrusn the schema / table name

func TestPermissions

func TestPermissions(conn Connection, tableName string) (err error)

TestPermissions tests the needed permissions in a given connection

func UID added in v0.0.5

func UID(obj interface{}) string

UID returns a unique ID for that object

func Upsert added in v0.0.5

func Upsert(conn Connection, tx *Transaction, sourceTable, targetTable string, pkFields []string) (count int64, err error)

Upsert upserts from source table into target table

Types

type BaseConn

type BaseConn struct {
	Connection
	URL  string
	Type dbio.Type // the type of database for sqlx: postgres, mysql, sqlite

	Data iop.Dataset
	// contains filtered or unexported fields
}

BaseConn is a database connection

func (*BaseConn) BaseURL

func (conn *BaseConn) BaseURL() string

BaseURL returns the base URL with default port

func (*BaseConn) Begin

func (conn *BaseConn) Begin(options ...*sql.TxOptions) (err error)

Begin starts a connection wide transaction

func (*BaseConn) BeginContext added in v0.0.5

func (conn *BaseConn) BeginContext(ctx context.Context, options ...*sql.TxOptions) (err error)

BeginContext starts a connection wide transaction

func (*BaseConn) BulkExportFlow

func (conn *BaseConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)

BulkExportFlow creates a dataflow from a sql query

func (*BaseConn) BulkExportFlowCSV

func (conn *BaseConn) BulkExportFlowCSV(sqls ...string) (df *iop.Dataflow, err error)

BulkExportFlowCSV creates a dataflow from a sql query, using CSVs

func (*BaseConn) BulkExportStream

func (conn *BaseConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)

BulkExportStream streams the rows in bulk

func (*BaseConn) BulkImportFlow

func (conn *BaseConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow imports the streams rows in bulk concurrently using channels

func (*BaseConn) BulkImportStream

func (conn *BaseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream import the stream rows in bulk

func (*BaseConn) CastColumnForSelect

func (conn *BaseConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) string

CastColumnForSelect casts to the correct target column type

func (*BaseConn) CastColumnsForSelect

func (conn *BaseConn) CastColumnsForSelect(srcColumns iop.Columns, tgtColumns iop.Columns) []string

CastColumnsForSelect cast the source columns into the target Column types

func (*BaseConn) Close

func (conn *BaseConn) Close() error

Close closes the connection

func (*BaseConn) Commit

func (conn *BaseConn) Commit() (err error)

Commit commits a connection wide transaction

func (*BaseConn) CompareChecksums

func (conn *BaseConn) CompareChecksums(tableName string, columns iop.Columns) (err error)

CompareChecksums compares the checksum values from the database side to the checkum values from the StreamProcessor

func (*BaseConn) Connect

func (conn *BaseConn) Connect(timeOut ...int) (err error)

Connect connects to the database

func (*BaseConn) Context

func (conn *BaseConn) Context() *g.Context

Context returns the db context

func (*BaseConn) CreateTable added in v0.0.5

func (conn *BaseConn) CreateTable(tableName string, cols iop.Columns, tableDDL string) (err error)

CreateTable creates a new table based on provided columns `tableName` should have 'schema.table' format

func (*BaseConn) CreateTemporaryTable added in v0.0.5

func (conn *BaseConn) CreateTemporaryTable(tableName string, cols iop.Columns) (err error)

CreateTemporaryTable creates a temp table based on provided columns

func (*BaseConn) Db

func (conn *BaseConn) Db() *sqlx.DB

Db returns the sqlx db object

func (*BaseConn) DbX added in v0.0.5

func (conn *BaseConn) DbX() *DbX

DbX returns the DbX object

func (*BaseConn) DropTable

func (conn *BaseConn) DropTable(tableNames ...string) (err error)

DropTable drops given table.

func (*BaseConn) DropView

func (conn *BaseConn) DropView(viewNames ...string) (err error)

DropView drops given view.

func (*BaseConn) Exec

func (conn *BaseConn) Exec(sql string, args ...interface{}) (result sql.Result, err error)

Exec runs a sql query, returns `error`

func (*BaseConn) ExecContext

func (conn *BaseConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)

ExecContext runs a sql query with context, returns `error`

func (*BaseConn) ExecMulti added in v0.0.5

func (conn *BaseConn) ExecMulti(sql string, args ...interface{}) (result sql.Result, err error)

ExecMulti runs mutiple sql queries, returns `error`

func (*BaseConn) ExecMultiContext added in v0.0.5

func (conn *BaseConn) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)

ExecMultiContext runs multiple sql queries with context, returns `error`

func (*BaseConn) GenerateDDL

func (conn *BaseConn) GenerateDDL(tableFName string, data iop.Dataset, temporary bool) (string, error)

GenerateDDL genrate a DDL based on a dataset

func (*BaseConn) GenerateInsertStatement

func (conn *BaseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string

GenerateInsertStatement returns the proper INSERT statement

func (*BaseConn) GenerateUpsertExpressions

func (conn *BaseConn) GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)

GenerateUpsertExpressions returns a map with needed expressions

func (*BaseConn) GenerateUpsertSQL added in v0.0.5

func (conn *BaseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL returns a sql for upsert

func (*BaseConn) GetAnalysis added in v0.0.5

func (conn *BaseConn) GetAnalysis(analysisName string, values map[string]interface{}) (sql string, err error)

GetAnalysis runs an analysis

func (*BaseConn) GetAnalysisField added in v0.0.5

func (conn *BaseConn) GetAnalysisField(analysisName string, tableFName string, fields ...string) (sql string, err error)

GetAnalysisField runs a field level analysis

func (*BaseConn) GetAnalysisTable added in v0.0.5

func (conn *BaseConn) GetAnalysisTable(analysisName string, tableFNames ...string) (sql string, err error)

GetAnalysisTable runs a table level analysis

func (*BaseConn) GetColumnStats

func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error)

GetColumnStats analyzes the table and returns the column statistics

func (*BaseConn) GetColumns

func (conn *BaseConn) GetColumns(tableFName string, fields ...string) (columns iop.Columns, err error)

GetColumns returns columns for given table. `tableFName` should include schema and table, example: `schema1.table2` fields should be `column_name|data_type`

func (*BaseConn) GetColumnsFull

func (conn *BaseConn) GetColumnsFull(tableFName string) (iop.Dataset, error)

GetColumnsFull returns columns for given table. `tableName` should include schema and table, example: `schema1.table2` fields should be `schema_name|table_name|table_type|column_name|data_type|column_id`

func (*BaseConn) GetCount

func (conn *BaseConn) GetCount(tableFName string) (uint64, error)

GetCount returns count of records

func (*BaseConn) GetDDL

func (conn *BaseConn) GetDDL(tableFName string) (string, error)

GetDDL returns DDL for given table.

func (*BaseConn) GetGormConn

func (conn *BaseConn) GetGormConn(config *gorm.Config) (*gorm.DB, error)

GetGormConn returns the gorm db connection

func (*BaseConn) GetIndexes

func (conn *BaseConn) GetIndexes(tableFName string) (iop.Dataset, error)

GetIndexes returns indexes for given table.

func (*BaseConn) GetNativeType added in v0.0.5

func (conn *BaseConn) GetNativeType(col iop.Column) (nativeType string, err error)

GetNativeType returns the native column type from generic

func (*BaseConn) GetObjects

func (conn *BaseConn) GetObjects(schema string, objectType string) (iop.Dataset, error)

GetObjects returns objects (tables or views) for given schema `objectType` can be either 'table', 'view' or 'all'

func (*BaseConn) GetPrimaryKeys

func (conn *BaseConn) GetPrimaryKeys(tableFName string) (iop.Dataset, error)

GetPrimaryKeys returns primark keys for given table.

func (*BaseConn) GetProp

func (conn *BaseConn) GetProp(key string) string

GetProp returns the value of a property

func (*BaseConn) GetSQLColumns

func (conn *BaseConn) GetSQLColumns(sqls ...string) (columns iop.Columns, err error)

GetSQLColumns return columns from a sql query result

func (*BaseConn) GetSchemaObjects

func (conn *BaseConn) GetSchemaObjects(schemaName string) (Schema, error)

GetSchemaObjects obtain full schemata info

func (*BaseConn) GetSchemas

func (conn *BaseConn) GetSchemas() (iop.Dataset, error)

GetSchemas returns schemas

func (*BaseConn) GetTables

func (conn *BaseConn) GetTables(schema string) (iop.Dataset, error)

GetTables returns tables for given schema

func (*BaseConn) GetTemplateValue

func (conn *BaseConn) GetTemplateValue(path string) (value string)

GetTemplateValue returns the value of the path

func (*BaseConn) GetType

func (conn *BaseConn) GetType() dbio.Type

GetType returns the type db object

func (*BaseConn) GetURL

func (conn *BaseConn) GetURL(newURL ...string) string

GetURL returns the processed URL

func (*BaseConn) GetViews

func (conn *BaseConn) GetViews(schema string) (iop.Dataset, error)

GetViews returns views for given schema

func (*BaseConn) Import

func (conn *BaseConn) Import(data iop.Dataset, tableName string) error

Import imports `data` into `tableName`

func (*BaseConn) Init

func (conn *BaseConn) Init() (err error)

Init initiates the connection object & add default port if missing

func (*BaseConn) InsertBatchStream

func (conn *BaseConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertBatchStream inserts a stream into a table in batch

func (*BaseConn) InsertStream

func (conn *BaseConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertStream inserts a stream into a table

func (*BaseConn) Kill

func (conn *BaseConn) Kill() error

Kill kill the database connection

func (*BaseConn) LoadTemplates

func (conn *BaseConn) LoadTemplates() error

LoadTemplates loads the appropriate yaml template

func (*BaseConn) MustExec

func (conn *BaseConn) MustExec(sql string, args ...interface{}) (result sql.Result)

MustExec execs the query using e and panics if there was an error. Any placeholder parameters are replaced with supplied args.

func (*BaseConn) NewTransaction added in v0.0.5

func (conn *BaseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (*Transaction, error)

NewTransaction creates a new transaction

func (*BaseConn) OptimizeTable

func (conn *BaseConn) OptimizeTable(tableName string, newColStats iop.Columns) (err error)

OptimizeTable analyzes the table and alters the table with the columns data type based on its analysis result if table is missing, it is created with a new DDl Hole in this: will truncate data points, since it is based only on new data being inserted... would need a complete stats of the target table to properly optimize.

func (*BaseConn) Prepare

func (conn *BaseConn) Prepare(query string) (stmt *sql.Stmt, err error)

Prepare prepares the statement

func (*BaseConn) PropArr

func (conn *BaseConn) PropArr() []string

PropArr returns an array of properties

func (*BaseConn) Props

func (conn *BaseConn) Props() map[string]string

Props returns a map properties

func (*BaseConn) Query

func (conn *BaseConn) Query(sql string, limit ...int) (data iop.Dataset, err error)

Query runs a sql query, returns `result`, `error`

func (*BaseConn) QueryContext

func (conn *BaseConn) QueryContext(ctx context.Context, sql string, limit ...int) (iop.Dataset, error)

QueryContext runs a sql query with ctx, returns `result`, `error`

func (*BaseConn) Quote

func (conn *BaseConn) Quote(field string, normalize ...bool) string

Quote adds quotes to the field name

func (*BaseConn) Rollback

func (conn *BaseConn) Rollback() (err error)

Rollback rolls back a connection wide transaction

func (*BaseConn) RunAnalysis

func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (data iop.Dataset, err error)

RunAnalysis runs an analysis

func (*BaseConn) Schemata

func (conn *BaseConn) Schemata() Schemata

Schemata returns the Schemata object

func (*BaseConn) Self

func (conn *BaseConn) Self() Connection

Self returns the respective connection Instance This is useful to refer back to a subclass method from the superclass level. (Aka overloading)

func (*BaseConn) SetProp

func (conn *BaseConn) SetProp(key string, val string)

SetProp sets the value of a property

func (*BaseConn) SetTx added in v0.0.5

func (conn *BaseConn) SetTx(tx *Transaction)

SetTx sets the transaction

func (*BaseConn) StreamRecords

func (conn *BaseConn) StreamRecords(sql string) (<-chan map[string]interface{}, error)

StreamRecords the records of a sql query, returns `result`, `error`

func (*BaseConn) StreamRows

func (conn *BaseConn) StreamRows(sql string, limit ...int) (ds *iop.Datastream, err error)

StreamRows the rows of a sql query, returns `result`, `error`

func (*BaseConn) StreamRowsContext

func (conn *BaseConn) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)

StreamRowsContext streams the rows of a sql query with context, returns `result`, `error`

func (*BaseConn) SwapTable

func (conn *BaseConn) SwapTable(srcTable string, tgtTable string) (err error)

SwapTable swaps two table

func (*BaseConn) TableExists

func (conn *BaseConn) TableExists(tableFName string) (exists bool, err error)

TableExists returns true if the table exists

func (*BaseConn) Template

func (conn *BaseConn) Template() Template

Template returns the Template object

func (*BaseConn) Tx

func (conn *BaseConn) Tx() *Transaction

Tx returns the current sqlx tx object

func (*BaseConn) Unquote

func (conn *BaseConn) Unquote(field string) string

Unquote removes quotes to the field name

func (*BaseConn) Upsert

func (conn *BaseConn) Upsert(srcTable string, tgtTable string, primKeys []string) (rowAffCnt int64, err error)

Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types

func (*BaseConn) ValidateColumnNames

func (conn *BaseConn) ValidateColumnNames(tgtColNames []string, colNames []string, quote bool) (newColNames []string, err error)

ValidateColumnNames verifies that source fields are present in the target table It will return quoted field names as `newColNames`, the same length as `colNames`

type BigQueryConn

type BigQueryConn struct {
	BaseConn
	URL       string
	Client    *bigquery.Client
	ProjectID string
	DatasetID string
	Location  string
	Datasets  []string
}

BigQueryConn is a Google Big Query connection

func (*BigQueryConn) BulkExportFlow

func (conn *BigQueryConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)

BulkExportFlow reads in bulk

func (*BigQueryConn) BulkImportFlow

func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in S3 and then use the COPY command.

func (*BigQueryConn) BulkImportStream

func (conn *BigQueryConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream demonstrates loading data into a BigQuery table using a file on the local filesystem.

func (*BigQueryConn) Close

func (conn *BigQueryConn) Close() error

Close closes the connection

func (*BigQueryConn) Connect

func (conn *BigQueryConn) Connect(timeOut ...int) error

Connect connects to the database

func (*BigQueryConn) CopyFromGCS

func (conn *BigQueryConn) CopyFromGCS(gcsURI string, tableFName string, dsColumns []iop.Column) error

CopyFromGCS into bigquery from google storage

func (*BigQueryConn) CopyToGCS

func (conn *BigQueryConn) CopyToGCS(tableFName string, gcsURI string) error

CopyToGCS Copy table to gc storage

func (*BigQueryConn) ExecContext

func (conn *BigQueryConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)

ExecContext runs a sql query with context, returns `error`

func (*BigQueryConn) GenerateUpsertSQL added in v0.0.5

func (conn *BigQueryConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*BigQueryConn) Init

func (conn *BigQueryConn) Init() error

Init initiates the object

func (*BigQueryConn) InsertBatchStream

func (conn *BigQueryConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertBatchStream inserts a stream into a table in batch

func (*BigQueryConn) InsertStream

func (conn *BigQueryConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertStream demonstrates loading data into a BigQuery table using a file on the local filesystem.

func (*BigQueryConn) StreamRowsContext

func (conn *BigQueryConn) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)

StreamRowsContext streams the rows of a sql query with context, returns `result`, `error`

func (*BigQueryConn) Unload

func (conn *BigQueryConn) Unload(sqls ...string) (gsPath string, err error)

Unload to Google Cloud Storage

type Connection

type Connection interface {
	BaseURL() string
	Begin(options ...*sql.TxOptions) error
	BeginContext(ctx context.Context, options ...*sql.TxOptions) error
	BulkExportFlow(sqls ...string) (*iop.Dataflow, error)
	BulkExportStream(sql string) (*iop.Datastream, error)
	BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
	BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
	CastColumnForSelect(srcColumn iop.Column, tgtColumn iop.Column) string
	CastColumnsForSelect(srcColumns iop.Columns, tgtColumns iop.Columns) []string
	Close() error
	Commit() error
	CompareChecksums(tableName string, columns iop.Columns) (err error)
	Connect(timeOut ...int) error
	Context() *g.Context
	CreateTemporaryTable(tableName string, cols iop.Columns) (err error)
	CreateTable(tableName string, cols iop.Columns, tableDDL string) (err error)
	Db() *sqlx.DB
	DbX() *DbX
	DropTable(...string) error
	DropView(...string) error
	ExecMulti(sql string, args ...interface{}) (result sql.Result, err error)
	ExecMultiContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
	Exec(sql string, args ...interface{}) (result sql.Result, err error)
	ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
	GenerateDDL(tableFName string, data iop.Dataset, temporary bool) (string, error)
	GenerateInsertStatement(tableName string, fields []string, numRows int) string
	GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
	GetColumns(tableFName string, fields ...string) (iop.Columns, error)
	GetColumnsFull(string) (iop.Dataset, error)
	GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error)
	GetCount(string) (uint64, error)
	GetDDL(string) (string, error)
	GetGormConn(config *gorm.Config) (*gorm.DB, error)
	GetIndexes(string) (iop.Dataset, error)
	GetNativeType(col iop.Column) (nativeType string, err error)
	GetPrimaryKeys(string) (iop.Dataset, error)
	GetProp(string) string
	GetSchemaObjects(string) (Schema, error)
	GetSchemas() (iop.Dataset, error)
	GetSQLColumns(sqls ...string) (columns iop.Columns, err error)
	GetTables(string) (iop.Dataset, error)
	GetTemplateValue(path string) (value string)
	GetType() dbio.Type
	GetURL(newURL ...string) string
	GetViews(string) (iop.Dataset, error)
	Init() error
	InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
	InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
	Kill() error
	LoadTemplates() error
	MustExec(sql string, args ...interface{}) (result sql.Result)
	NewTransaction(ctx context.Context, options ...*sql.TxOptions) (*Transaction, error)
	OptimizeTable(tableName string, columns iop.Columns) (err error)
	Prepare(query string) (stmt *sql.Stmt, err error)
	Props() map[string]string
	PropsArr() []string
	Query(sql string, limit ...int) (iop.Dataset, error)
	QueryContext(ctx context.Context, sql string, limit ...int) (iop.Dataset, error)
	Quote(field string, normalize ...bool) string
	RenameTable(table string, newTable string) (err error)
	Rollback() error
	RunAnalysis(string, map[string]interface{}) (iop.Dataset, error)
	GetAnalysis(string, map[string]interface{}) (string, error)
	GetAnalysisField(string, string, ...string) (string, error)
	GetAnalysisTable(string, ...string) (string, error)
	Schemata() Schemata
	Self() Connection

	SetProp(string, string)
	SetTx(*Transaction)
	StreamRecords(sql string) (<-chan map[string]interface{}, error)
	StreamRows(sql string, limit ...int) (*iop.Datastream, error)
	StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)
	SwapTable(srcTable string, tgtTable string) (err error)
	TableExists(tableFName string) (exists bool, err error)
	Template() Template
	Tx() *Transaction
	Unquote(string) string
	Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)
	ValidateColumnNames(tgtColName []string, colNames []string, quote bool) (newColNames []string, err error)
	// contains filtered or unexported methods
}

Connection is the Base interface for Connections

func NewConn

func NewConn(URL string, props ...string) (Connection, error)

NewConn return the most proper connection for a given database

func NewConnContext

func NewConnContext(ctx context.Context, URL string, props ...string) (Connection, error)

NewConnContext return the most proper connection for a given database with context props are provided as `"Prop1=Value1", "Prop2=Value2", ...`

type DbX added in v0.0.5

type DbX struct {
	// contains filtered or unexported fields
}

DbX is db express

func (*DbX) Delete added in v0.0.5

func (x *DbX) Delete(o interface{}) (cnt int, err error)

Delete deletes from object or slice

func (*DbX) Get added in v0.0.5

func (x *DbX) Get(o interface{}, fields ...string) (err error)

Get retrieves object

func (*DbX) Insert added in v0.0.5

func (x *DbX) Insert(o interface{}, fields ...string) (err error)

Insert inserts object or slice

func (*DbX) Select added in v0.0.5

func (x *DbX) Select(o interface{}, fields ...string) (err error)

Select retrieves objects

func (*DbX) TableName added in v0.0.5

func (x *DbX) TableName(o interface{}) (name string)

TableName returns the table name of object or slice

func (*DbX) Update added in v0.0.5

func (x *DbX) Update(o interface{}, fields ...string) (cnt int, err error)

Update updates from object or slice

func (*DbX) Upsert added in v0.0.5

func (x *DbX) Upsert(o interface{}, fields ...string) (cnt int, err error)

Upsert upserts from object or slice

func (*DbX) Where added in v0.0.5

func (x *DbX) Where(where ...interface{}) *DbX

Where adds a where clause

type ModelDbX added in v0.0.5

type ModelDbX struct {
	Ptr          interface{} `json:"-"`
	RowsAffected int         `json:"-"`
	// contains filtered or unexported fields
}

ModelDbX is the base for any SQL model

func (*ModelDbX) Bind added in v0.0.5

func (m *ModelDbX) Bind(bindFunc func(p interface{}) error, objPtr interface{}) (err error)

Bind extracts values from provided echo context

func (*ModelDbX) Delete added in v0.0.5

func (m *ModelDbX) Delete(db *sqlx.DB) (err error)

Delete deletes a record

func (*ModelDbX) Fields added in v0.0.5

func (m *ModelDbX) Fields() (fields []string)

Fields returns the model fields

func (*ModelDbX) Get added in v0.0.5

func (m *ModelDbX) Get(db *sqlx.DB, fields ...string) (err error)

Get get the first record

func (*ModelDbX) Insert added in v0.0.5

func (m *ModelDbX) Insert(db *sqlx.DB, fields ...string) (err error)

Insert inserts one records

func (*ModelDbX) Rec added in v0.0.5

func (m *ModelDbX) Rec() map[string]interface{}

Rec returns the record

func (*ModelDbX) Select added in v0.0.5

func (m *ModelDbX) Select(db *sqlx.DB, objPtr interface{}, fields ...string) (err error)

Select returns multiple records

func (*ModelDbX) TableName added in v0.0.5

func (m *ModelDbX) TableName(objPtr interface{}) string

TableName returns the table name of the underlying pointer

func (*ModelDbX) Update added in v0.0.5

func (m *ModelDbX) Update(db *sqlx.DB, fields ...string) (err error)

Insert inserts one records

func (*ModelDbX) Values added in v0.0.5

func (m *ModelDbX) Values(fields []string) (values []interface{}, err error)

Where adds a where clause

func (*ModelDbX) Where added in v0.0.5

func (m *ModelDbX) Where(where ...interface{}) *ModelDbX

Where adds a where clause

type MsSQLServerConn

type MsSQLServerConn struct {
	BaseConn
	URL string
	// contains filtered or unexported fields
}

MsSQLServerConn is a Microsoft SQL Server connection

func (*MsSQLServerConn) BcpExport

func (conn *MsSQLServerConn) BcpExport() (err error)

BcpExport exports data to datastream

func (*MsSQLServerConn) BcpImportStream

func (conn *MsSQLServerConn) BcpImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BcpImportStream Import using bcp tool https://docs.microsoft.com/en-us/sql/tools/bcp-utility?view=sql-server-ver15 bcp dbo.test1 in '/tmp/LargeDataset.csv' -S tcp:sqlserver.host,51433 -d master -U sa -P 'password' -c -t ',' -b 5000 Limitation: if comma or delimite is in field, it will error. need to use delimiter not in field, or do some other transformation

func (*MsSQLServerConn) BcpImportStreamParrallel

func (conn *MsSQLServerConn) BcpImportStreamParrallel(tableFName string, ds *iop.Datastream) (count uint64, err error)

BcpImportStreamParrallel uses goroutine to import partitioned files

func (*MsSQLServerConn) BulkImportFlow

func (conn *MsSQLServerConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow bulk import flow

func (*MsSQLServerConn) BulkImportStream

func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream bulk import stream

func (*MsSQLServerConn) CopyFromAzure

func (conn *MsSQLServerConn) CopyFromAzure(tableFName, azPath string) (count uint64, err error)

CopyFromAzure uses the COPY INTO Table command from Azure https://docs.microsoft.com/en-us/sql/t-sql/statements/copy-into-transact-sql?view=azure-sqldw-latest

func (*MsSQLServerConn) CopyViaAzure

func (conn *MsSQLServerConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)

CopyViaAzure uses the Azure DWH COPY INTO Table command

func (*MsSQLServerConn) GenerateUpsertSQL added in v0.0.5

func (conn *MsSQLServerConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*MsSQLServerConn) GetURL

func (conn *MsSQLServerConn) GetURL(newURL ...string) string

GetURL returns the processed URL

func (*MsSQLServerConn) Init

func (conn *MsSQLServerConn) Init() error

Init initiates the object

type MySQLConn

type MySQLConn struct {
	BaseConn
	URL string
}

MySQLConn is a Postgres connection

func (*MySQLConn) BulkExportStream

func (conn *MySQLConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)

BulkExportStream bulk Export

func (*MySQLConn) BulkImportStream

func (conn *MySQLConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream bulk import stream

func (*MySQLConn) GenerateUpsertSQL added in v0.0.5

func (conn *MySQLConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

UPSERT https://vladmihalcea.com/how-do-upsert-and-merge-work-in-oracle-sql-server-postgresql-and-mysql/ GenerateUpsertSQL generates the upsert SQL

func (*MySQLConn) GetURL

func (conn *MySQLConn) GetURL(newURL ...string) string

GetURL returns the processed URL

func (*MySQLConn) Init

func (conn *MySQLConn) Init() error

Init initiates the object

func (*MySQLConn) LoadDataInFile

func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (count uint64, err error)

LoadDataInFile Bulk Import

func (*MySQLConn) LoadDataOutFile

func (conn *MySQLConn) LoadDataOutFile(sql string) (stdOutReader io.Reader, err error)

LoadDataOutFile Bulk Export Possible error: ERROR 1227 (42000) at line 1: Access denied; you need (at least one of) the FILE privilege(s) for this operation File privilege needs to be granted to user also the --secure-file-priv option needs to be set properly for it to work. https://stackoverflow.com/questions/9819271/why-is-mysql-innodb-insert-so-slow to improve innodb insert speed

type OracleConn

type OracleConn struct {
	BaseConn
	URL string
}

OracleConn is a Postgres connection

func (*OracleConn) BulkImportStream

func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream bulk import stream

func (*OracleConn) GenerateInsertStatement

func (conn *OracleConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string

GenerateInsertStatement returns the proper INSERT statement

func (*OracleConn) GenerateUpsertSQL added in v0.0.5

func (conn *OracleConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*OracleConn) Init

func (conn *OracleConn) Init() error

Init initiates the object

func (*OracleConn) SQLLoad

func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error)

SQLLoad uses sqlldr to Bulk Import cat test1.csv | sqlldr system/[email protected]:1521/xe control=sqlldr.ctl log=/dev/stdout bad=/dev/stderr cannot import when newline in value. Need to scan for new lines.

type Pool

type Pool struct {
	Dbs map[string]*sqlx.DB
	Mux sync.Mutex
}

Pool is a pool of connections

type PostgresConn

type PostgresConn struct {
	BaseConn
	URL string
}

PostgresConn is a Postgres connection

func (*PostgresConn) BulkExportStream

func (conn *PostgresConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)

BulkExportStream uses the bulk dumping (COPY)

func (*PostgresConn) BulkImportStream

func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream inserts a stream into a table

func (*PostgresConn) CastColumnForSelect

func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)

CastColumnForSelect casts to the correct target column type

func (*PostgresConn) CopyToStdout

func (conn *PostgresConn) CopyToStdout(sql string) (stdOutReader io.Reader, err error)

CopyToStdout Copy TO STDOUT

func (*PostgresConn) GenerateUpsertSQL added in v0.0.5

func (conn *PostgresConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*PostgresConn) Init

func (conn *PostgresConn) Init() error

Init initiates the object

type RedshiftConn

type RedshiftConn struct {
	BaseConn
	URL string
}

RedshiftConn is a Redshift connection

func (*RedshiftConn) BulkExportFlow

func (conn *RedshiftConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)

BulkExportFlow reads in bulk

func (*RedshiftConn) BulkExportStream

func (conn *RedshiftConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)

BulkExportStream reads in bulk

func (*RedshiftConn) BulkImportFlow

func (conn *RedshiftConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in S3 and then use the COPY command.

func (*RedshiftConn) BulkImportStream

func (conn *RedshiftConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream inserts a stream into a table. For redshift we need to create CSVs in S3 and then use the COPY command.

func (*RedshiftConn) CopyFromS3

func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string) (count uint64, err error)

CopyFromS3 uses the COPY INTO Table command from AWS S3

func (*RedshiftConn) GenerateUpsertSQL added in v0.0.5

func (conn *RedshiftConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*RedshiftConn) Init

func (conn *RedshiftConn) Init() error

Init initiates the object

func (*RedshiftConn) Unload

func (conn *RedshiftConn) Unload(sqls ...string) (s3Path string, err error)

Unload unloads a query to S3

type Result added in v0.0.5

type Result struct {
	// contains filtered or unexported fields
}

func (Result) LastInsertId added in v0.0.5

func (r Result) LastInsertId() (int64, error)

func (Result) RowsAffected added in v0.0.5

func (r Result) RowsAffected() (int64, error)

type SQLiteConn

type SQLiteConn struct {
	BaseConn
	URL string
}

SQLiteConn is a Google Big Query connection

func (*SQLiteConn) GenerateUpsertSQL added in v0.0.5

func (conn *SQLiteConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*SQLiteConn) Init

func (conn *SQLiteConn) Init() error

Init initiates the object

type Schema

type Schema struct {
	Name   string `json:"name"`
	Tables map[string]Table
}

Schema represents a schemata schema

func (*Schema) ToData

func (schema *Schema) ToData() (data iop.Dataset)

ToData converts schema objects to tabular format

type Schemata

type Schemata struct {
	Schemas map[string]Schema
	Tables  map[string]*Table // all tables with full name lower case (schema.table)
}

Schemata contains the full schema for a connection

type SnowflakeConn

type SnowflakeConn struct {
	BaseConn
	URL        string
	Warehouse  string
	CopyMethod string
}

SnowflakeConn is a Snowflake connection

func (*SnowflakeConn) BulkExportFlow

func (conn *SnowflakeConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)

BulkExportFlow reads in bulk

func (*SnowflakeConn) BulkImportFlow

func (conn *SnowflakeConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow bulk import flow

func (*SnowflakeConn) BulkImportStream

func (conn *SnowflakeConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream bulk import stream

func (*SnowflakeConn) Connect

func (conn *SnowflakeConn) Connect(timeOut ...int) error

Connect connects to the database

func (*SnowflakeConn) CopyFromAzure

func (conn *SnowflakeConn) CopyFromAzure(tableFName, azPath string) (err error)

CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) CopyFromS3

func (conn *SnowflakeConn) CopyFromS3(tableFName, s3Path string) (err error)

CopyFromS3 uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) CopyToAzure

func (conn *SnowflakeConn) CopyToAzure(sqls ...string) (azPath string, err error)

CopyToAzure exports a query to an Azure location

func (*SnowflakeConn) CopyToS3

func (conn *SnowflakeConn) CopyToS3(sqls ...string) (s3Path string, err error)

CopyToS3 exports a query to an S3 location

func (*SnowflakeConn) CopyViaAWS

func (conn *SnowflakeConn) CopyViaAWS(tableFName string, df *iop.Dataflow) (count uint64, err error)

CopyViaAWS uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) CopyViaAzure

func (conn *SnowflakeConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)

CopyViaAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) CopyViaStage

func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (count uint64, err error)

CopyViaStage uses the Snowflake COPY INTO Table command https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) GenerateUpsertSQL added in v0.0.5

func (conn *SnowflakeConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)

GenerateUpsertSQL generates the upsert SQL

func (*SnowflakeConn) Init

func (conn *SnowflakeConn) Init() error

Init initiates the object

func (*SnowflakeConn) PutFile

func (conn *SnowflakeConn) PutFile(fPath string, internalStagePath string) (err error)

PutFile Copies a local file or folder into a staging location

type Table

type Table struct {
	Name       string `json:"name"`
	FullName   string `json:"full_name"`
	IsView     bool   `json:"is_view"` // whether is a view
	Columns    iop.Columns
	ColumnsMap map[string]*iop.Column
}

Table represents a schemata table

type Template

type Template struct {
	Core           map[string]string
	Metadata       map[string]string
	Analysis       map[string]string
	Function       map[string]string `yaml:"function"`
	GeneralTypeMap map[string]string `yaml:"general_type_map"`
	NativeTypeMap  map[string]string `yaml:"native_type_map"`
	NativeStatsMap map[string]bool   `yaml:"native_stat_map"`
	Variable       map[string]string
}

Template is a database YAML template

func (Template) ToData

func (template Template) ToData() (data iop.Dataset)

ToData convert is dataset

type Transaction added in v0.0.5

type Transaction struct {
	Tx      *sqlx.Tx
	Context *g.Context
	Conn    Connection
	// contains filtered or unexported fields
}

Transaction is a database transaction

func (*Transaction) Commit added in v0.0.5

func (t *Transaction) Commit() (err error)

Commit commits connection wide transaction

func (*Transaction) DisableTrigger added in v0.0.5

func (t *Transaction) DisableTrigger(tableName, triggerName string) (err error)

DisableTrigger disables a trigger

func (*Transaction) EnableTrigger added in v0.0.5

func (t *Transaction) EnableTrigger(tableName, triggerName string) (err error)

EnableTrigger enables a trigger

func (*Transaction) Exec added in v0.0.5

func (t *Transaction) Exec(sql string, args ...interface{}) (result sql.Result, err error)

Exec runs a sql query, returns `error`

func (*Transaction) ExecContext added in v0.0.5

func (t *Transaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)

ExecContext runs a sql query with context, returns `error`

func (*Transaction) ExecMultiContext added in v0.0.5

func (t *Transaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)

ExecMultiContext runs multiple sql queries with context, returns `error`

func (*Transaction) InsertBatchStream added in v0.0.5

func (t *Transaction) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertBatchStream inserts a stream into a table in batch

func (*Transaction) InsertStream added in v0.0.5

func (t *Transaction) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertStream inserts a stream into a table

func (*Transaction) Prepare added in v0.0.5

func (t *Transaction) Prepare(query string) (stmt *sql.Stmt, err error)

Prepare prepares the statement

func (*Transaction) QueryContext added in v0.0.5

func (t *Transaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)

QueryContext queries rows

func (*Transaction) Rollback added in v0.0.5

func (t *Transaction) Rollback() (err error)

Rollback rolls back connection wide transaction

func (*Transaction) Upsert added in v0.0.5

func (t *Transaction) Upsert(sourceTable, targetTable string, pkFields []string) (count uint64, err error)

Upsert does an upsert from source table into target table

func (*Transaction) UpsertStream added in v0.0.5

func (t *Transaction) UpsertStream(tableFName string, ds *iop.Datastream, pk []string) (count uint64, err error)

UpsertStream inserts a stream into a table in batch

type User added in v0.0.5

type User struct {
	ModelDbX
	Name string
	Age  int
}

func NewUser added in v0.0.5

func NewUser() *User

type WhereClause added in v0.0.5

type WhereClause []interface{}

WhereClause is the where clause

func (WhereClause) Args added in v0.0.5

func (wc WhereClause) Args() []interface{}

Args returns the where clause arguments

func (WhereClause) Clause added in v0.0.5

func (wc WhereClause) Clause() string

Clause returns the string where clause

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL