Documentation ¶
Index ¶
- Constants
- type AgentHeader
- type ColumnSchema
- type FileReadingFollower
- func (f *FileReadingFollower) GetAvailableActions() map[string]*control.ActionDescription
- func (f *FileReadingFollower) GetMeta() map[string]interface{}
- func (f *FileReadingFollower) GetName() string
- func (f *FileReadingFollower) GetOutputChan() chan *events.LookatchEvent
- func (f *FileReadingFollower) GetSchema() interface{}
- func (f *FileReadingFollower) GetStatus() interface{}
- func (f *FileReadingFollower) HealthCheck() bool
- func (f *FileReadingFollower) Init()
- func (f *FileReadingFollower) IsEnable() bool
- func (f *FileReadingFollower) Process(action string, params ...interface{}) interface{}
- func (f *FileReadingFollower) Start(i ...interface{}) error
- func (f *FileReadingFollower) Stop() error
- type FileReadingFollowerConfig
- type JDBCQuery
- func (j *JDBCQuery) ExtractDatabaseTable(query string) (string, string)
- func (j *JDBCQuery) GetAvailableActions() map[string]*control.ActionDescription
- func (j *JDBCQuery) GetMeta() map[string]interface{}
- func (j *JDBCQuery) GetName() string
- func (j *JDBCQuery) GetOutputChan() chan *events.LookatchEvent
- func (j *JDBCQuery) GetSchema() interface{}
- func (j *JDBCQuery) GetStatus() interface{}
- func (j *JDBCQuery) HealthCheck() bool
- func (j *JDBCQuery) IsEnable() bool
- func (j *JDBCQuery) MarshallWorker(mapchan chan map[string]interface{}, database string, schema string, ...)
- func (j *JDBCQuery) ProcessLines(columns []string, lines [][]interface{}, mapchan chan map[string]interface{}, ...)
- func (j *JDBCQuery) Query(database string, query string)
- func (j *JDBCQuery) QueryMeta(query string) []map[string]interface{}
- func (j *JDBCQuery) QuerySchema(q string) (err error)
- func (j *JDBCQuery) Start(i ...interface{}) (err error)
- func (j *JDBCQuery) Stop() error
- type JDBCQueryConfig
- type MSSQLQuery
- func (m *MSSQLQuery) Connect()
- func (m *MSSQLQuery) GetStatus() interface{}
- func (m *MSSQLQuery) HealthCheck() bool
- func (m *MSSQLQuery) Init()
- func (m *MSSQLQuery) Process(action string, params ...interface{}) interface{}
- func (m *MSSQLQuery) Query(query string)
- func (m *MSSQLQuery) QueryMeta(query string) []map[string]interface{}
- func (m *MSSQLQuery) QuerySchema() (err error)
- type MSSQLQueryConfig
- type Message
- type Messages
- type Meta
- type MySQLQuery
- func (m *MySQLQuery) Connect(schema string)
- func (m *MySQLQuery) GetStatus() interface{}
- func (m *MySQLQuery) HealthCheck() bool
- func (m *MySQLQuery) Init()
- func (m *MySQLQuery) Process(action string, params ...interface{}) interface{}
- func (m *MySQLQuery) Query(query string)
- func (m *MySQLQuery) QueryMeta(query string) []map[string]interface{}
- func (m *MySQLQuery) QuerySchema() (err error)
- type MysqlCDC
- func (m *MysqlCDC) GetAvailableActions() map[string]*control.ActionDescription
- func (m *MysqlCDC) GetFirstBinlog() (string, uint32)
- func (m *MysqlCDC) GetMeta() map[string]interface{}
- func (m *MysqlCDC) GetName() string
- func (m *MysqlCDC) GetOutputChan() chan *events.LookatchEvent
- func (m *MysqlCDC) GetSchema() interface{}
- func (m *MysqlCDC) GetStatus() interface{}
- func (m *MysqlCDC) GetlastBinlog() (string, uint32)
- func (m *MysqlCDC) HealthCheck() bool
- func (m *MysqlCDC) Init()
- func (m *MysqlCDC) IsEnable() bool
- func (m *MysqlCDC) Process(action string, params ...interface{}) interface{}
- func (m *MysqlCDC) Start(i ...interface{}) (err error)
- func (m *MysqlCDC) Stop() error
- type MysqlCDCConfig
- type MysqlQueryConfig
- type Oldkeys
- type PostgreSQLCDC
- func (p *PostgreSQLCDC) GetAvailableActions() map[string]*control.ActionDescription
- func (p *PostgreSQLCDC) GetMeta() map[string]interface{}
- func (p *PostgreSQLCDC) GetName() string
- func (p *PostgreSQLCDC) GetOutputChan() chan *events.LookatchEvent
- func (p *PostgreSQLCDC) GetSchema() interface{}
- func (p *PostgreSQLCDC) GetStatus() interface{}
- func (p *PostgreSQLCDC) HealthCheck() bool
- func (p *PostgreSQLCDC) Init()
- func (p *PostgreSQLCDC) IsEnable() bool
- func (p *PostgreSQLCDC) NewReplicator() (*pgx.ReplicationConn, error)
- func (p *PostgreSQLCDC) Process(action string, params ...interface{}) interface{}
- func (p *PostgreSQLCDC) Start(i ...interface{}) (err error)
- func (p *PostgreSQLCDC) StartReplication()
- func (p *PostgreSQLCDC) Stop() error
- type PostgreSQLCDCConf
- type PostgreSQLQuery
- func (p *PostgreSQLQuery) Connect()
- func (p *PostgreSQLQuery) GetStatus() interface{}
- func (p *PostgreSQLQuery) HealthCheck() bool
- func (p *PostgreSQLQuery) Init()
- func (p *PostgreSQLQuery) Process(action string, params ...interface{}) interface{}
- func (p *PostgreSQLQuery) Query(query string)
- func (p *PostgreSQLQuery) QueryMeta(query string) []map[string]interface{}
- func (p *PostgreSQLQuery) QuerySchema() (err error)
- type PostgreSQLQueryConfig
- type Query
- type Random
- func (r *Random) GetAvailableActions() map[string]*control.ActionDescription
- func (r *Random) GetMeta() map[string]interface{}
- func (r *Random) GetName() string
- func (r *Random) GetOutputChan() chan *events.LookatchEvent
- func (r *Random) GetSchema() interface{}
- func (r *Random) GetStatus() interface{}
- func (r *Random) HealthCheck() bool
- func (r *Random) Init()
- func (r *Random) IsEnable() bool
- func (r *Random) Process(action string, params ...interface{}) interface{}
- func (r *Random) Start(i ...interface{}) error
- func (r *Random) Stop() error
- type RandomConfig
- type SQLSchema
- type Source
- type SourceI
- type Syslog
- func (s *Syslog) GetAvailableActions() map[string]*control.ActionDescription
- func (s *Syslog) GetMeta() map[string]interface{}
- func (s *Syslog) GetName() string
- func (s *Syslog) GetOutputChan() chan *events.LookatchEvent
- func (s *Syslog) GetSchema() interface{}
- func (s *Syslog) GetStatus() interface{}
- func (s *Syslog) HealthCheck() bool
- func (s *Syslog) Init()
- func (s *Syslog) IsEnable() bool
- func (s *Syslog) Process(action string, params ...interface{}) interface{}
- func (s *Syslog) Start(i ...interface{}) error
- func (s *Syslog) Stop() error
- type SyslogConfig
Constants ¶
const FileReadingFollowerType = "fileReadingFollower"
FileReadingFollowerType type of source
const MSSQLQueryType = "MSSQL"
MSSQLQueryType type of source
const MysqlCDCType = "MysqlCDC"
MysqlCDCType type of source
const MysqlQueryType = "MysqlQuery"
MysqlQueryType type of source
const PostgreSQLCDCType = "postgresqlCDC"
PostgreSQLCDCType type of source
const PostgreSQLQueryType = "postgresqlQuery"
PostgreSQLQueryType type of source
const RandomType = "random"
RandomType type of source
const SyslogType = "syslog"
SyslogType type of source
const TickerValue = 10
TickerValue number second to wait between to tick
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentHeader ¶
type AgentHeader struct {
// contains filtered or unexported fields
}
AgentHeader representation of Agent Header
type ColumnSchema ¶
type ColumnSchema struct { TableDatabase string TableSchema string TableName string ColumnName string ColumnOrdPos int IsNullable string DataType string CharacterMaximumLength sql.NullInt64 NumericPrecision sql.NullInt64 NumericScale sql.NullInt64 ColumnType string ColumnKey string }
ColumnSchema representation of schema
type FileReadingFollower ¶ added in v0.1.0
type FileReadingFollower struct { *Source // contains filtered or unexported fields }
FileReadingFollower representation of FileReadingFollower
func (*FileReadingFollower) GetAvailableActions ¶ added in v0.1.0
func (f *FileReadingFollower) GetAvailableActions() map[string]*control.ActionDescription
GetAvailableActions returns available actions
func (*FileReadingFollower) GetMeta ¶ added in v0.1.0
func (f *FileReadingFollower) GetMeta() map[string]interface{}
GetMeta get source meta
func (*FileReadingFollower) GetName ¶ added in v0.1.0
func (f *FileReadingFollower) GetName() string
GetName get source name
func (*FileReadingFollower) GetOutputChan ¶ added in v0.1.0
func (f *FileReadingFollower) GetOutputChan() chan *events.LookatchEvent
GetOutputChan get output channel
func (*FileReadingFollower) GetSchema ¶ added in v0.1.0
func (f *FileReadingFollower) GetSchema() interface{}
GetSchema Get source Schema
func (*FileReadingFollower) GetStatus ¶ added in v0.1.0
func (f *FileReadingFollower) GetStatus() interface{}
GetStatus Get source status
func (*FileReadingFollower) HealthCheck ¶ added in v0.1.0
func (f *FileReadingFollower) HealthCheck() bool
HealthCheck return true if ok
func (*FileReadingFollower) IsEnable ¶ added in v0.1.0
func (f *FileReadingFollower) IsEnable() bool
IsEnable check if source is enable
func (*FileReadingFollower) Process ¶ added in v0.1.0
func (f *FileReadingFollower) Process(action string, params ...interface{}) interface{}
Process action
func (*FileReadingFollower) Start ¶ added in v0.1.0
func (f *FileReadingFollower) Start(i ...interface{}) error
Start source
func (*FileReadingFollower) Stop ¶ added in v0.1.0
func (f *FileReadingFollower) Stop() error
Stop source
type FileReadingFollowerConfig ¶ added in v0.1.0
FileReadingFollowerConfig representation of FileReadingFollower Config
type JDBCQuery ¶
type JDBCQuery struct { *Source Config JDBCQueryConfig // contains filtered or unexported fields }
JDBCQuery representation of JDBC query
func NewJDBCQuery ¶
NewJDBCQuery create new JDBC query client
func (*JDBCQuery) ExtractDatabaseTable ¶
ExtractDatabaseTable Extract Database and Table from query
func (*JDBCQuery) GetAvailableActions ¶
func (j *JDBCQuery) GetAvailableActions() map[string]*control.ActionDescription
GetAvailableActions returns available actions
func (*JDBCQuery) GetOutputChan ¶
func (j *JDBCQuery) GetOutputChan() chan *events.LookatchEvent
GetOutputChan get output channel
func (*JDBCQuery) GetStatus ¶
func (j *JDBCQuery) GetStatus() interface{}
GetStatus get source status
func (*JDBCQuery) HealthCheck ¶
HealthCheck return true if ok
func (*JDBCQuery) MarshallWorker ¶
func (j *JDBCQuery) MarshallWorker(mapchan chan map[string]interface{}, database string, schema string, table string)
MarshallWorker marshalling statement into json and sent to output sink
func (*JDBCQuery) ProcessLines ¶
func (j *JDBCQuery) ProcessLines(columns []string, lines [][]interface{}, mapchan chan map[string]interface{}, wg *sizedwaitgroup.SizedWaitGroup)
ProcessLines process bunch of lines from resultset to map and sent to marshall goroutine
func (*JDBCQuery) QuerySchema ¶
QuerySchema query schema
type JDBCQueryConfig ¶
type JDBCQueryConfig struct { Host string `json:"host"` Port int `json:"port"` User string `json:"user"` Password string `json:"password"` BatchSize int `json:"batchsize"` NbWorker int `json:"nbworker"` }
JDBCQueryConfig representation of JDBC query configuration
type MSSQLQuery ¶
type MSSQLQuery struct { *JDBCQuery // contains filtered or unexported fields }
MSSQLQuery representation of MSSQL Query source
func (*MSSQLQuery) GetStatus ¶
func (m *MSSQLQuery) GetStatus() interface{}
GetStatus returns current status of connexion
func (*MSSQLQuery) HealthCheck ¶
func (m *MSSQLQuery) HealthCheck() bool
HealthCheck returns true if source is ok
func (*MSSQLQuery) Process ¶
func (m *MSSQLQuery) Process(action string, params ...interface{}) interface{}
Process process an action
func (*MSSQLQuery) QueryMeta ¶
func (m *MSSQLQuery) QueryMeta(query string) []map[string]interface{}
QueryMeta execute query meta string
func (*MSSQLQuery) QuerySchema ¶
func (m *MSSQLQuery) QuerySchema() (err error)
QuerySchema extract schema from database
type MSSQLQueryConfig ¶
type MSSQLQueryConfig struct { *JDBCQueryConfig SslMode string `json:"sslmode"` Database string `json:"database"` }
MSSQLQueryConfig representation MSSQL Query configuration
type Message ¶
type Message struct { Columnnames []string `json:"columnnames"` Columntypes []string `json:"columntypes"` Columnvalues []interface{} `json:"columnvalues"` Kind string `json:"kind"` Schema string `json:"schema"` Table string `json:"table"` Oldkeys Oldkeys `json:"oldkeys"` }
Message representation of message
type Messages ¶
type Messages struct {
Change []Message `json:"change"`
}
Messages representation of messages
type Meta ¶
type Meta struct { LastState string `json:"laststate"` Lsn uint64 `json:"offset"` SlotStatus bool `json:"slotstatus"` }
Meta representation of metadata
type MySQLQuery ¶
type MySQLQuery struct { *JDBCQuery // contains filtered or unexported fields }
MySQLQuery representation of MySQL Query source
func (*MySQLQuery) Connect ¶
func (m *MySQLQuery) Connect(schema string)
Connect connection to database
func (*MySQLQuery) GetStatus ¶
func (m *MySQLQuery) GetStatus() interface{}
GetStatus returns current status of connexion
func (*MySQLQuery) HealthCheck ¶
func (m *MySQLQuery) HealthCheck() bool
HealthCheck returns true if source is ok
func (*MySQLQuery) Process ¶
func (m *MySQLQuery) Process(action string, params ...interface{}) interface{}
Process process an action
func (*MySQLQuery) QueryMeta ¶
func (m *MySQLQuery) QueryMeta(query string) []map[string]interface{}
QueryMeta execute query meta string
func (*MySQLQuery) QuerySchema ¶
func (m *MySQLQuery) QuerySchema() (err error)
QuerySchema extract schema from database
type MysqlCDC ¶
type MysqlCDC struct { *Source // contains filtered or unexported fields }
MysqlCDC representation of Mysql change data capture
func (*MysqlCDC) GetAvailableActions ¶
func (m *MysqlCDC) GetAvailableActions() map[string]*control.ActionDescription
GetAvailableActions returns available actions
func (*MysqlCDC) GetFirstBinlog ¶
GetFirstBinlog Get First Binlog offset
func (*MysqlCDC) GetOutputChan ¶
func (m *MysqlCDC) GetOutputChan() chan *events.LookatchEvent
GetOutputChan get output channel
func (*MysqlCDC) GetlastBinlog ¶
GetlastBinlog Get last Binlog offset
type MysqlCDCConfig ¶
type MysqlCDCConfig struct { Host string `json:"host"` Port int `json:"port"` User string `json:"user"` Password string `json:"password"` SlaveID int `json:"slave_id" mapstructure:"slave_id"` Offset string `json:"offset"` LogFile string `json:"logfile"` OldValue bool `json:"old_value" mapstructure:"old_value"` FilterPolicy string `json:"filter_policy" mapstructure:"filter_policy"` Filter map[string]interface{} `json:"filter"` Enabled bool `json:"enabled"` }
MysqlCDCConfig representation of Mysql change data capture configuration
type MysqlQueryConfig ¶
type MysqlQueryConfig struct { *JDBCQueryConfig Schema string `json:"schema"` Exclude []string `json:"exclude"` }
MysqlQueryConfig representation MySQL Query configuration
type Oldkeys ¶
type Oldkeys struct { Keynames []string `json:"keynames"` Keytypes []string `json:"keytypes"` Keyvalues []interface{} `json:"keyvalues"` }
Oldkeys representation of old event
type PostgreSQLCDC ¶
type PostgreSQLCDC struct { *Source // contains filtered or unexported fields }
PostgreSQLCDC representation of PostgreSQL change data capture
func (*PostgreSQLCDC) GetAvailableActions ¶
func (p *PostgreSQLCDC) GetAvailableActions() map[string]*control.ActionDescription
GetAvailableActions returns available actions
func (*PostgreSQLCDC) GetMeta ¶
func (p *PostgreSQLCDC) GetMeta() map[string]interface{}
GetMeta get metadata
func (*PostgreSQLCDC) GetOutputChan ¶
func (p *PostgreSQLCDC) GetOutputChan() chan *events.LookatchEvent
GetOutputChan get output channel
func (*PostgreSQLCDC) GetSchema ¶
func (p *PostgreSQLCDC) GetSchema() interface{}
GetSchema get schema
func (*PostgreSQLCDC) GetStatus ¶
func (p *PostgreSQLCDC) GetStatus() interface{}
GetStatus get status
func (*PostgreSQLCDC) HealthCheck ¶
func (p *PostgreSQLCDC) HealthCheck() bool
HealthCheck returns true if ok
func (*PostgreSQLCDC) IsEnable ¶
func (p *PostgreSQLCDC) IsEnable() bool
IsEnable check if source is enable
func (*PostgreSQLCDC) NewReplicator ¶
func (p *PostgreSQLCDC) NewReplicator() (*pgx.ReplicationConn, error)
NewReplicator create new pg logical decoding connection
func (*PostgreSQLCDC) Process ¶
func (p *PostgreSQLCDC) Process(action string, params ...interface{}) interface{}
Process action
func (*PostgreSQLCDC) Start ¶
func (p *PostgreSQLCDC) Start(i ...interface{}) (err error)
Start source
func (*PostgreSQLCDC) StartReplication ¶ added in v0.1.0
func (p *PostgreSQLCDC) StartReplication()
StartReplication Start Replication
type PostgreSQLCDCConf ¶
type PostgreSQLCDCConf struct { Host string `json:"host"` Port int `json:"port"` User string `json:"user"` Password string `json:"password"` SslMode string `json:"sslmode"` Database string `json:"database"` Offset string `json:"offset"` SlotName string `json:"slot_name" mapstructure:"slot_name"` OldValue bool `json:"old_value" mapstructure:"old_value"` FilterPolicy string `json:"filter_policy" mapstructure:"filter_policy"` Filter map[string]interface{} `json:"filter"` Enabled bool `json:"enabled"` }
PostgreSQLCDCConf representation of PostgreSQL change data capture configuration
type PostgreSQLQuery ¶
type PostgreSQLQuery struct { *JDBCQuery // contains filtered or unexported fields }
PostgreSQLQuery representation of PostgreSQL Query source
func (*PostgreSQLQuery) Connect ¶
func (p *PostgreSQLQuery) Connect()
Connect connection to database
func (*PostgreSQLQuery) GetStatus ¶
func (p *PostgreSQLQuery) GetStatus() interface{}
GetStatus returns current status of connexion
func (*PostgreSQLQuery) HealthCheck ¶
func (p *PostgreSQLQuery) HealthCheck() bool
HealthCheck returns true if source is ok
func (*PostgreSQLQuery) Init ¶
func (p *PostgreSQLQuery) Init()
Init initialisation of PostgreSQL Query source
func (*PostgreSQLQuery) Process ¶
func (p *PostgreSQLQuery) Process(action string, params ...interface{}) interface{}
Process process an action
func (*PostgreSQLQuery) Query ¶
func (p *PostgreSQLQuery) Query(query string)
Query execute query string
func (*PostgreSQLQuery) QueryMeta ¶
func (p *PostgreSQLQuery) QueryMeta(query string) []map[string]interface{}
QueryMeta execute query meta string
func (*PostgreSQLQuery) QuerySchema ¶
func (p *PostgreSQLQuery) QuerySchema() (err error)
QuerySchema extract schema from database
type PostgreSQLQueryConfig ¶
type PostgreSQLQueryConfig struct { *JDBCQueryConfig SslMode string `json:"sslmode"` Database string `json:"database"` }
PostgreSQLQueryConfig representation PostgreSQL Query configuration
type Query ¶
type Query struct {
Query string `description:"SQL query to execute on agent" required:"true"`
}
Query representation of query action
type Random ¶
Random representation of Random
func (*Random) GetAvailableActions ¶
func (r *Random) GetAvailableActions() map[string]*control.ActionDescription
GetAvailableActions returns available actions
func (*Random) GetOutputChan ¶
func (r *Random) GetOutputChan() chan *events.LookatchEvent
GetOutputChan get output channel
type RandomConfig ¶
type RandomConfig struct {
Wait string `json:"wait"`
}
RandomConfig representation of Random Config
type SQLSchema ¶ added in v0.1.0
type SQLSchema map[string]map[string]map[string]*ColumnSchema
SQLSchema schema table Position
type Source ¶
type Source struct { Name string OutputChannel chan *events.LookatchEvent AgentInfo *AgentHeader IsEnable bool Conf *viper.Viper Offset int64 }
Source representation of source
type SourceI ¶
type SourceI interface { Init() Stop() error Start(...interface{}) error GetName() string GetOutputChan() chan *events.LookatchEvent GetMeta() map[string]interface{} GetSchema() interface{} GetStatus() interface{} IsEnable() bool HealthCheck() bool GetAvailableActions() map[string]*control.ActionDescription Process(string, ...interface{}) interface{} }
SourceI interface
type Syslog ¶ added in v0.1.0
Syslog representation of Random
func (*Syslog) GetAvailableActions ¶ added in v0.1.0
func (s *Syslog) GetAvailableActions() map[string]*control.ActionDescription
GetAvailableActions returns available actions
func (*Syslog) GetOutputChan ¶ added in v0.1.0
func (s *Syslog) GetOutputChan() chan *events.LookatchEvent
GetOutputChan get output channel
func (*Syslog) GetSchema ¶ added in v0.1.0
func (s *Syslog) GetSchema() interface{}
GetSchema returns schema
func (*Syslog) GetStatus ¶ added in v0.1.0
func (s *Syslog) GetStatus() interface{}
GetStatus get source status
func (*Syslog) HealthCheck ¶ added in v0.1.0
HealthCheck return true if ok
type SyslogConfig ¶ added in v0.1.0
SyslogConfig representation of Random Config