sources

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2018 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const DummyType = "dummy"

DummyType type of source

View Source
const MSSQLQueryType = "MSSQL"

MSSQLQueryType type of source

View Source
const MysqlCDCType = "MysqlCDC"

MysqlCDCType type of source

View Source
const MysqlQueryType = "MysqlQuery"

MysqlQueryType type of source

View Source
const PostgreSQLCDCType = "postgresqlCDC"

PostgreSQLCDCType type of source

View Source
const PostgreSQLQueryType = "postgresqlQuery"

PostgreSQLQueryType type of source

View Source
const RandomType = "random"

RandomType type of source

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentHeader

type AgentHeader struct {
	// contains filtered or unexported fields
}

AgentHeader representation of Agent Header

type ColumnSchema

type ColumnSchema struct {
	TableDatabase          string
	TableSchema            string
	TableName              string
	ColumnName             string
	ColumnOrdPos           int
	IsNullable             string
	DataType               string
	CharacterMaximumLength sql.NullInt64
	NumericPrecision       sql.NullInt64
	NumericScale           sql.NullInt64
	ColumnType             string
	ColumnKey              string
}

ColumnSchema representation of schema

type Dummy

type Dummy struct {
	*Source
}

Dummy representation of Dummy

func (*Dummy) GetAvailableActions

func (d *Dummy) GetAvailableActions() map[string]*control.ActionDescription

GetAvailableActions returns available actions

func (*Dummy) GetMeta

func (d *Dummy) GetMeta() map[string]interface{}

GetMeta returns source meta

func (*Dummy) GetName

func (d *Dummy) GetName() string

GetName get source name

func (*Dummy) GetOutputChan

func (d *Dummy) GetOutputChan() chan *events.LookatchEvent

GetOutputChan get output channel

func (*Dummy) GetSchema

func (d *Dummy) GetSchema() interface{}

GetSchema returns schema

func (*Dummy) GetStatus

func (d *Dummy) GetStatus() interface{}

GetStatus get source status

func (*Dummy) HealthCheck

func (d *Dummy) HealthCheck() bool

HealthCheck return true if ok

func (*Dummy) Init

func (d *Dummy) Init()

Init dummy source

func (*Dummy) IsEnable

func (d *Dummy) IsEnable() bool

IsEnable check if source is enable

func (*Dummy) Process

func (d *Dummy) Process(action string, params ...interface{}) interface{}

Process action

func (*Dummy) Start

func (d *Dummy) Start(i ...interface{}) error

Start dummy source

func (*Dummy) Stop

func (d *Dummy) Stop() error

Stop dummy source

type JDBCQuery

type JDBCQuery struct {
	*Source
	Config JDBCQueryConfig
	// contains filtered or unexported fields
}

JDBCQuery representation of JDBC query

func NewJDBCQuery

func NewJDBCQuery(s *Source) JDBCQuery

NewJDBCQuery create new JDBC query client

func (*JDBCQuery) ExtractDatabaseTable

func (j *JDBCQuery) ExtractDatabaseTable(query string) (string, string)

ExtractDatabaseTable Extract Database and Table from query

func (*JDBCQuery) GetAvailableActions

func (j *JDBCQuery) GetAvailableActions() map[string]*control.ActionDescription

GetAvailableActions returns available actions

func (*JDBCQuery) GetMeta

func (j *JDBCQuery) GetMeta() map[string]interface{}

GetMeta returns source meta

func (*JDBCQuery) GetName

func (j *JDBCQuery) GetName() string

GetName get name of source

func (*JDBCQuery) GetOutputChan

func (j *JDBCQuery) GetOutputChan() chan *events.LookatchEvent

GetOutputChan get output channel

func (*JDBCQuery) GetSchema

func (j *JDBCQuery) GetSchema() interface{}

GetSchema returns schema

func (*JDBCQuery) GetStatus

func (j *JDBCQuery) GetStatus() interface{}

GetStatus get source status

func (*JDBCQuery) HealthCheck

func (j *JDBCQuery) HealthCheck() bool

HealthCheck return true if ok

func (*JDBCQuery) IsEnable

func (j *JDBCQuery) IsEnable() bool

IsEnable check if source is enable

func (*JDBCQuery) MarshallWorker

func (j *JDBCQuery) MarshallWorker(mapchan chan map[string]interface{}, database string, schema string, table string)

MarshallWorker marshalling statement into json and sent to output sink

func (*JDBCQuery) ProcessLines

func (j *JDBCQuery) ProcessLines(columns []string, lines [][]interface{}, mapchan chan map[string]interface{}, wg *sync.WaitGroup)

ProcessLines process bunch of lines from resultset to map and sent to marshall goroutine

func (*JDBCQuery) Query

func (j *JDBCQuery) Query(database string, query string)

Query execute query

func (*JDBCQuery) QueryMeta

func (j *JDBCQuery) QueryMeta(query string, table string, db string, mapAdd map[string]interface{}) map[string]interface{}

QueryMeta execute query metadata

func (*JDBCQuery) QuerySchema

func (j *JDBCQuery) QuerySchema(q string) (err error)

QuerySchema query schema

func (*JDBCQuery) Start

func (j *JDBCQuery) Start(i ...interface{}) (err error)

Start source

func (*JDBCQuery) Stop

func (j *JDBCQuery) Stop() error

Stop source

type JDBCQueryConfig

type JDBCQueryConfig struct {
	Host      string `json:"host"`
	Port      int    `json:"port"`
	User      string `json:"user"`
	Password  string `json:"password"`
	BatchSize int    `json:"batchsize"`
	NbWorker  int    `json:"nbworker"`
}

JDBCQueryConfig representation of JDBC query configuration

type MSSQLQuery

type MSSQLQuery struct {
	*JDBCQuery
	// contains filtered or unexported fields
}

MSSQLQuery representation of MSSQL Query source

func (*MSSQLQuery) Connect

func (m *MSSQLQuery) Connect()

Connect connection to database

func (*MSSQLQuery) GetStatus

func (m *MSSQLQuery) GetStatus() interface{}

GetStatus returns current status of connexion

func (*MSSQLQuery) HealthCheck

func (m *MSSQLQuery) HealthCheck() bool

HealthCheck returns true if source is ok

func (*MSSQLQuery) Init

func (m *MSSQLQuery) Init()

Init initialisation of MSSQL Query source

func (*MSSQLQuery) Process

func (m *MSSQLQuery) Process(action string, params ...interface{}) interface{}

Process process an action

func (*MSSQLQuery) Query

func (m *MSSQLQuery) Query(query string)

Query execute query string

func (*MSSQLQuery) QueryMeta

func (m *MSSQLQuery) QueryMeta(query string, table string, db string, mapAdd map[string]interface{}) map[string]interface{}

QueryMeta execute query meta string

func (*MSSQLQuery) QuerySchema

func (m *MSSQLQuery) QuerySchema() (err error)

QuerySchema extract schema from database

type MSSQLQueryConfig

type MSSQLQueryConfig struct {
	*JDBCQueryConfig
	SslMode  string `json:"sslmode"`
	Database string `json:"database"`
}

MSSQLQueryConfig representation MSSQL Query configuration

type Message

type Message struct {
	Columnnames  []string      `json:"columnnames"`
	Columntypes  []string      `json:"columntypes"`
	Columnvalues []interface{} `json:"columnvalues"`
	Kind         string        `json:"kind"`
	Schema       string        `json:"schema"`
	Table        string        `json:"table"`
	Oldkeys      Oldkeys       `json:"oldkeys"`
}

Message representation of message

type Messages

type Messages struct {
	Change []Message `json:"change"`
}

Messages representation of messages

type Meta

type Meta struct {
	LastState string `json:"message"`
	Lsn       uint64 `json:"offset"`
}

Meta representation of metadata

type MySQLQuery

type MySQLQuery struct {
	*JDBCQuery
	// contains filtered or unexported fields
}

MySQLQuery representation of MySQL Query source

func (*MySQLQuery) Connect

func (m *MySQLQuery) Connect(schema string)

Connect connection to database

func (*MySQLQuery) GetStatus

func (m *MySQLQuery) GetStatus() interface{}

GetStatus returns current status of connexion

func (*MySQLQuery) HealthCheck

func (m *MySQLQuery) HealthCheck() bool

HealthCheck returns true if source is ok

func (*MySQLQuery) Init

func (m *MySQLQuery) Init()

Init initialisation of Mysql Query source

func (*MySQLQuery) Process

func (m *MySQLQuery) Process(action string, params ...interface{}) interface{}

Process process an action

func (*MySQLQuery) Query

func (m *MySQLQuery) Query(query string)

Query execute query string

func (*MySQLQuery) QueryMeta

func (m *MySQLQuery) QueryMeta(query string, table string, db string, mapAdd map[string]interface{}) map[string]interface{}

QueryMeta execute query meta string

func (*MySQLQuery) QuerySchema

func (m *MySQLQuery) QuerySchema() (err error)

QuerySchema extract schema from database

type MysqlCDC

type MysqlCDC struct {
	*Source
	// contains filtered or unexported fields
}

MysqlCDC representation of Mysql change data capture

func (*MysqlCDC) GetAvailableActions

func (m *MysqlCDC) GetAvailableActions() map[string]*control.ActionDescription

GetAvailableActions returns available actions

func (*MysqlCDC) GetFirstBinlog

func (m *MysqlCDC) GetFirstBinlog() (string, uint32)

GetFirstBinlog Get First Binlog offset

func (*MysqlCDC) GetMeta

func (m *MysqlCDC) GetMeta() map[string]interface{}

GetMeta get metadata

func (*MysqlCDC) GetName

func (m *MysqlCDC) GetName() string

GetName get source source

func (*MysqlCDC) GetOutputChan

func (m *MysqlCDC) GetOutputChan() chan *events.LookatchEvent

GetOutputChan get output channel

func (*MysqlCDC) GetSchema

func (m *MysqlCDC) GetSchema() interface{}

GetSchema get schema

func (*MysqlCDC) GetStatus

func (m *MysqlCDC) GetStatus() interface{}

GetStatus get status

func (*MysqlCDC) GetlastBinlog

func (m *MysqlCDC) GetlastBinlog() (string, uint32)

GetlastBinlog Get last Binlog offset

func (*MysqlCDC) HealthCheck

func (m *MysqlCDC) HealthCheck() bool

HealthCheck returns true if ok

func (*MysqlCDC) Init

func (m *MysqlCDC) Init()

Init source

func (*MysqlCDC) IsEnable

func (m *MysqlCDC) IsEnable() bool

IsEnable check if source is enable

func (*MysqlCDC) Process

func (m *MysqlCDC) Process(action string, params ...interface{}) interface{}

Process action

func (*MysqlCDC) Start

func (m *MysqlCDC) Start(i ...interface{}) (err error)

Start source

func (*MysqlCDC) Stop

func (m *MysqlCDC) Stop() error

Stop source

type MysqlCDCConfig

type MysqlCDCConfig struct {
	Host          string                 `json:"host"`
	Port          int                    `json:"port"`
	User          string                 `json:"user"`
	Password      string                 `json:"password"`
	Slave_id      int                    `json:"slave_id"`
	Offset        string                 `json:"offset"`
	LogFile       string                 `json:"logfile"`
	Old_value     bool                   `json:"old_value"`
	Filter_policy string                 `json:"filter_policy"`
	Filter        map[string]interface{} `json:"filter"`
	Enabled       bool                   `json:"enabled"`
}

MysqlCDCConfig representation of Mysql change data capture configuration

type MysqlQueryConfig

type MysqlQueryConfig struct {
	*JDBCQueryConfig
	Schema  string   `json:"schema"`
	Exclude []string `json:"exclude"`
}

MysqlQueryConfig representation MySQL Query configuration

type Oldkeys

type Oldkeys struct {
	Keynames  []string      `json:"keynames"`
	Keytypes  []string      `json:"keytypes"`
	Keyvalues []interface{} `json:"keyvalues"`
}

Oldkeys representation of old event

type PostgreSQLCDC

type PostgreSQLCDC struct {
	*Source
	// contains filtered or unexported fields
}

PostgreSQLCDC representation of PostgreSQL change data capture

func (*PostgreSQLCDC) GetAvailableActions

func (p *PostgreSQLCDC) GetAvailableActions() map[string]*control.ActionDescription

GetAvailableActions returns available actions

func (*PostgreSQLCDC) GetMeta

func (p *PostgreSQLCDC) GetMeta() map[string]interface{}

GetMeta get metadata

func (*PostgreSQLCDC) GetName

func (p *PostgreSQLCDC) GetName() string

GetName get source name

func (*PostgreSQLCDC) GetOutputChan

func (p *PostgreSQLCDC) GetOutputChan() chan *events.LookatchEvent

GetOutputChan get output channel

func (*PostgreSQLCDC) GetSchema

func (p *PostgreSQLCDC) GetSchema() interface{}

GetSchema get schema

func (*PostgreSQLCDC) GetStatus

func (p *PostgreSQLCDC) GetStatus() interface{}

GetStatus get status

func (*PostgreSQLCDC) HealthCheck

func (p *PostgreSQLCDC) HealthCheck() bool

HealthCheck returns true if ok

func (*PostgreSQLCDC) Init

func (p *PostgreSQLCDC) Init()

Init source

func (*PostgreSQLCDC) IsEnable

func (p *PostgreSQLCDC) IsEnable() bool

IsEnable check if source is enable

func (*PostgreSQLCDC) NewReplicator

func (p *PostgreSQLCDC) NewReplicator() (*pgx.ReplicationConn, error)

NewReplicator create new pg logical decoding connection

func (*PostgreSQLCDC) Process

func (p *PostgreSQLCDC) Process(action string, params ...interface{}) interface{}

Process action

func (*PostgreSQLCDC) Start

func (p *PostgreSQLCDC) Start(i ...interface{}) (err error)

Start source

func (*PostgreSQLCDC) Stop

func (p *PostgreSQLCDC) Stop() error

Stop source

type PostgreSQLCDCConf

type PostgreSQLCDCConf struct {
	Host          string                 `json:"host"`
	Port          int                    `json:"port"`
	User          string                 `json:"user"`
	Password      string                 `json:"password"`
	SslMode       string                 `json:"sslmode"`
	Database      string                 `json:"database"`
	Offset        string                 `json:"offset"`
	Slot_name     string                 `json:"slot_name"`
	Old_value     bool                   `json:"old_value"`
	Filter_policy string                 `json:"filter_policy"`
	Filter        map[string]interface{} `json:"filter"`
	Enabled       bool                   `json:"enabled"`
}

PostgreSQLCDCConf representation of PostgreSQL change data capture configuration

type PostgreSQLQuery

type PostgreSQLQuery struct {
	*JDBCQuery
	// contains filtered or unexported fields
}

PostgreSQLQuery representation of PostgreSQL Query source

func (*PostgreSQLQuery) Connect

func (p *PostgreSQLQuery) Connect()

Connect connection to database

func (*PostgreSQLQuery) GetStatus

func (p *PostgreSQLQuery) GetStatus() interface{}

GetStatus returns current status of connexion

func (*PostgreSQLQuery) HealthCheck

func (p *PostgreSQLQuery) HealthCheck() bool

HealthCheck returns true if source is ok

func (*PostgreSQLQuery) Init

func (p *PostgreSQLQuery) Init()

Init initialisation of PostgreSQL Query source

func (*PostgreSQLQuery) Process

func (p *PostgreSQLQuery) Process(action string, params ...interface{}) interface{}

Process process an action

func (*PostgreSQLQuery) Query

func (p *PostgreSQLQuery) Query(query string)

Query execute query string

func (*PostgreSQLQuery) QueryMeta

func (p *PostgreSQLQuery) QueryMeta(query string, table string, db string, mapAdd map[string]interface{}) map[string]interface{}

QueryMeta execute query meta string

func (*PostgreSQLQuery) QuerySchema

func (p *PostgreSQLQuery) QuerySchema() (err error)

QuerySchema extract schema from database

type PostgreSQLQueryConfig

type PostgreSQLQueryConfig struct {
	*JDBCQueryConfig
	SslMode  string `json:"sslmode"`
	Database string `json:"database"`
}

PostgreSQLQueryConfig representation PostgreSQL Query configuration

type Query

type Query struct {
	Query string `description:"SQL query to execute on agent" required:"true"`
}

Query representation of query action

type Random

type Random struct {
	*Source

	NbMessages int
	// contains filtered or unexported fields
}

Random representation of Random

func (*Random) GetAvailableActions

func (r *Random) GetAvailableActions() map[string]*control.ActionDescription

GetAvailableActions returns available actions

func (*Random) GetMeta

func (r *Random) GetMeta() map[string]interface{}

GetMeta get source meta

func (*Random) GetName

func (r *Random) GetName() string

GetName get source name

func (*Random) GetOutputChan

func (r *Random) GetOutputChan() chan *events.LookatchEvent

GetOutputChan get output channel

func (*Random) GetSchema

func (r *Random) GetSchema() interface{}

GetSchema Get source Schema

func (*Random) GetStatus

func (r *Random) GetStatus() interface{}

GetStatus Get source status

func (*Random) HealthCheck

func (r *Random) HealthCheck() bool

HealthCheck return true if ok

func (*Random) Init

func (r *Random) Init()

Init source

func (*Random) IsEnable

func (r *Random) IsEnable() bool

IsEnable check if source is enable

func (*Random) Process

func (r *Random) Process(action string, params ...interface{}) interface{}

Process action

func (*Random) Start

func (r *Random) Start(i ...interface{}) error

Start source

func (*Random) Stop

func (r *Random) Stop() error

Stop source

type RandomConfig

type RandomConfig struct {
	Enabled bool   `json:"enabled"`
	Wait    string `json:"wait"`
}

RandomConfig representation of Random Config

type Source

type Source struct {
	Name          string
	OutputChannel chan *events.LookatchEvent
	AgentInfo     *AgentHeader
	IsEnable      bool
	Conf          *viper.Viper
	Offset        int64
}

Source representation of source

type SourceI

type SourceI interface {
	Init()
	Stop() error
	Start(...interface{}) error
	GetName() string
	GetOutputChan() chan *events.LookatchEvent
	GetMeta() map[string]interface{}
	GetSchema() interface{}
	GetStatus() interface{}
	IsEnable() bool
	HealthCheck() bool
	GetAvailableActions() map[string]*control.ActionDescription
	Process(string, ...interface{}) interface{}
}

SourceI interface

func New

func New(name string, sourceType string, config *viper.Viper, eventChan chan *events.LookatchEvent) (s SourceI, err error)

New create new source

type SqlSchema

type SqlSchema map[string]map[string]map[string]*ColumnSchema

SqlSchema schema table Position

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL