sources

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2018 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const FileReadingFollowerType = "fileReadingFollower"

FileReadingFollowerType type of source

View Source
const MSSQLQueryType = "MSSQL"

MSSQLQueryType type of source

View Source
const MysqlCDCType = "MysqlCDC"

MysqlCDCType type of source

View Source
const MysqlQueryType = "MysqlQuery"

MysqlQueryType type of source

View Source
const PostgreSQLCDCType = "postgresqlCDC"

PostgreSQLCDCType type of source

View Source
const PostgreSQLQueryType = "postgresqlQuery"

PostgreSQLQueryType type of source

View Source
const RandomType = "random"

RandomType type of source

View Source
const SyslogType = "syslog"

SyslogType type of source

View Source
const TickerValue = 10

TickerValue number second to wait between to tick

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentHeader

type AgentHeader struct {
	// contains filtered or unexported fields
}

AgentHeader representation of Agent Header

type ColumnSchema

type ColumnSchema struct {
	TableDatabase          string
	TableSchema            string
	TableName              string
	ColumnName             string
	ColumnOrdPos           int
	IsNullable             string
	DataType               string
	CharacterMaximumLength sql.NullInt64
	NumericPrecision       sql.NullInt64
	NumericScale           sql.NullInt64
	ColumnType             string
	ColumnKey              string
}

ColumnSchema representation of schema

type FileReadingFollower added in v0.1.0

type FileReadingFollower struct {
	*Source
	// contains filtered or unexported fields
}

FileReadingFollower representation of FileReadingFollower

func (*FileReadingFollower) GetAvailableActions added in v0.1.0

func (f *FileReadingFollower) GetAvailableActions() map[string]*control.ActionDescription

GetAvailableActions returns available actions

func (*FileReadingFollower) GetMeta added in v0.1.0

func (f *FileReadingFollower) GetMeta() map[string]interface{}

GetMeta get source meta

func (*FileReadingFollower) GetName added in v0.1.0

func (f *FileReadingFollower) GetName() string

GetName get source name

func (*FileReadingFollower) GetOutputChan added in v0.1.0

func (f *FileReadingFollower) GetOutputChan() chan *events.LookatchEvent

GetOutputChan get output channel

func (*FileReadingFollower) GetSchema added in v0.1.0

func (f *FileReadingFollower) GetSchema() interface{}

GetSchema Get source Schema

func (*FileReadingFollower) GetStatus added in v0.1.0

func (f *FileReadingFollower) GetStatus() interface{}

GetStatus Get source status

func (*FileReadingFollower) HealthCheck added in v0.1.0

func (f *FileReadingFollower) HealthCheck() bool

HealthCheck return true if ok

func (*FileReadingFollower) Init added in v0.1.0

func (f *FileReadingFollower) Init()

Init source

func (*FileReadingFollower) IsEnable added in v0.1.0

func (f *FileReadingFollower) IsEnable() bool

IsEnable check if source is enable

func (*FileReadingFollower) Process added in v0.1.0

func (f *FileReadingFollower) Process(action string, params ...interface{}) interface{}

Process action

func (*FileReadingFollower) Start added in v0.1.0

func (f *FileReadingFollower) Start(i ...interface{}) error

Start source

func (*FileReadingFollower) Stop added in v0.1.0

func (f *FileReadingFollower) Stop() error

Stop source

type FileReadingFollowerConfig added in v0.1.0

type FileReadingFollowerConfig struct {
	Path   string `json:"path"`
	Offset int64  `json:"offset"`
}

FileReadingFollowerConfig representation of FileReadingFollower Config

type JDBCQuery

type JDBCQuery struct {
	*Source
	Config JDBCQueryConfig
	// contains filtered or unexported fields
}

JDBCQuery representation of JDBC query

func NewJDBCQuery

func NewJDBCQuery(s *Source) JDBCQuery

NewJDBCQuery create new JDBC query client

func (*JDBCQuery) ExtractDatabaseTable

func (j *JDBCQuery) ExtractDatabaseTable(query string) (string, string)

ExtractDatabaseTable Extract Database and Table from query

func (*JDBCQuery) GetAvailableActions

func (j *JDBCQuery) GetAvailableActions() map[string]*control.ActionDescription

GetAvailableActions returns available actions

func (*JDBCQuery) GetMeta

func (j *JDBCQuery) GetMeta() map[string]interface{}

GetMeta returns source meta

func (*JDBCQuery) GetName

func (j *JDBCQuery) GetName() string

GetName get name of source

func (*JDBCQuery) GetOutputChan

func (j *JDBCQuery) GetOutputChan() chan *events.LookatchEvent

GetOutputChan get output channel

func (*JDBCQuery) GetSchema

func (j *JDBCQuery) GetSchema() interface{}

GetSchema returns schema

func (*JDBCQuery) GetStatus

func (j *JDBCQuery) GetStatus() interface{}

GetStatus get source status

func (*JDBCQuery) HealthCheck

func (j *JDBCQuery) HealthCheck() bool

HealthCheck return true if ok

func (*JDBCQuery) IsEnable

func (j *JDBCQuery) IsEnable() bool

IsEnable check if source is enable

func (*JDBCQuery) MarshallWorker

func (j *JDBCQuery) MarshallWorker(mapchan chan map[string]interface{}, database string, schema string, table string)

MarshallWorker marshalling statement into json and sent to output sink

func (*JDBCQuery) ProcessLines

func (j *JDBCQuery) ProcessLines(columns []string, lines [][]interface{}, mapchan chan map[string]interface{}, wg *sizedwaitgroup.SizedWaitGroup)

ProcessLines process bunch of lines from resultset to map and sent to marshall goroutine

func (*JDBCQuery) Query

func (j *JDBCQuery) Query(database string, query string)

Query execute query

func (*JDBCQuery) QueryMeta

func (j *JDBCQuery) QueryMeta(query string) []map[string]interface{}

QueryMeta execute query metadata

func (*JDBCQuery) QuerySchema

func (j *JDBCQuery) QuerySchema(q string) (err error)

QuerySchema query schema

func (*JDBCQuery) Start

func (j *JDBCQuery) Start(i ...interface{}) (err error)

Start source

func (*JDBCQuery) Stop

func (j *JDBCQuery) Stop() error

Stop source

type JDBCQueryConfig

type JDBCQueryConfig struct {
	Host      string `json:"host"`
	Port      int    `json:"port"`
	User      string `json:"user"`
	Password  string `json:"password"`
	BatchSize int    `json:"batchsize"`
	NbWorker  int    `json:"nbworker"`
}

JDBCQueryConfig representation of JDBC query configuration

type MSSQLQuery

type MSSQLQuery struct {
	*JDBCQuery
	// contains filtered or unexported fields
}

MSSQLQuery representation of MSSQL Query source

func (*MSSQLQuery) Connect

func (m *MSSQLQuery) Connect()

Connect connection to database

func (*MSSQLQuery) GetStatus

func (m *MSSQLQuery) GetStatus() interface{}

GetStatus returns current status of connexion

func (*MSSQLQuery) HealthCheck

func (m *MSSQLQuery) HealthCheck() bool

HealthCheck returns true if source is ok

func (*MSSQLQuery) Init

func (m *MSSQLQuery) Init()

Init initialisation of MSSQL Query source

func (*MSSQLQuery) Process

func (m *MSSQLQuery) Process(action string, params ...interface{}) interface{}

Process process an action

func (*MSSQLQuery) Query

func (m *MSSQLQuery) Query(query string)

Query execute query string

func (*MSSQLQuery) QueryMeta

func (m *MSSQLQuery) QueryMeta(query string) []map[string]interface{}

QueryMeta execute query meta string

func (*MSSQLQuery) QuerySchema

func (m *MSSQLQuery) QuerySchema() (err error)

QuerySchema extract schema from database

type MSSQLQueryConfig

type MSSQLQueryConfig struct {
	*JDBCQueryConfig
	SslMode  string `json:"sslmode"`
	Database string `json:"database"`
}

MSSQLQueryConfig representation MSSQL Query configuration

type Message

type Message struct {
	Columnnames  []string      `json:"columnnames"`
	Columntypes  []string      `json:"columntypes"`
	Columnvalues []interface{} `json:"columnvalues"`
	Kind         string        `json:"kind"`
	Schema       string        `json:"schema"`
	Table        string        `json:"table"`
	Oldkeys      Oldkeys       `json:"oldkeys"`
}

Message representation of message

type Messages

type Messages struct {
	Change []Message `json:"change"`
}

Messages representation of messages

type Meta

type Meta struct {
	LastState  string `json:"laststate"`
	Lsn        uint64 `json:"offset"`
	SlotStatus bool   `json:"slotstatus"`
}

Meta representation of metadata

type MySQLQuery

type MySQLQuery struct {
	*JDBCQuery
	// contains filtered or unexported fields
}

MySQLQuery representation of MySQL Query source

func (*MySQLQuery) Connect

func (m *MySQLQuery) Connect(schema string)

Connect connection to database

func (*MySQLQuery) GetStatus

func (m *MySQLQuery) GetStatus() interface{}

GetStatus returns current status of connexion

func (*MySQLQuery) HealthCheck

func (m *MySQLQuery) HealthCheck() bool

HealthCheck returns true if source is ok

func (*MySQLQuery) Init

func (m *MySQLQuery) Init()

Init initialisation of Mysql Query source

func (*MySQLQuery) Process

func (m *MySQLQuery) Process(action string, params ...interface{}) interface{}

Process process an action

func (*MySQLQuery) Query

func (m *MySQLQuery) Query(query string)

Query execute query string

func (*MySQLQuery) QueryMeta

func (m *MySQLQuery) QueryMeta(query string) []map[string]interface{}

QueryMeta execute query meta string

func (*MySQLQuery) QuerySchema

func (m *MySQLQuery) QuerySchema() (err error)

QuerySchema extract schema from database

type MysqlCDC

type MysqlCDC struct {
	*Source
	// contains filtered or unexported fields
}

MysqlCDC representation of Mysql change data capture

func (*MysqlCDC) GetAvailableActions

func (m *MysqlCDC) GetAvailableActions() map[string]*control.ActionDescription

GetAvailableActions returns available actions

func (*MysqlCDC) GetFirstBinlog

func (m *MysqlCDC) GetFirstBinlog() (string, uint32)

GetFirstBinlog Get First Binlog offset

func (*MysqlCDC) GetMeta

func (m *MysqlCDC) GetMeta() map[string]interface{}

GetMeta get metadata

func (*MysqlCDC) GetName

func (m *MysqlCDC) GetName() string

GetName get source source

func (*MysqlCDC) GetOutputChan

func (m *MysqlCDC) GetOutputChan() chan *events.LookatchEvent

GetOutputChan get output channel

func (*MysqlCDC) GetSchema

func (m *MysqlCDC) GetSchema() interface{}

GetSchema get schema

func (*MysqlCDC) GetStatus

func (m *MysqlCDC) GetStatus() interface{}

GetStatus get status

func (*MysqlCDC) GetlastBinlog

func (m *MysqlCDC) GetlastBinlog() (string, uint32)

GetlastBinlog Get last Binlog offset

func (*MysqlCDC) HealthCheck

func (m *MysqlCDC) HealthCheck() bool

HealthCheck returns true if ok

func (*MysqlCDC) Init

func (m *MysqlCDC) Init()

Init source

func (*MysqlCDC) IsEnable

func (m *MysqlCDC) IsEnable() bool

IsEnable check if source is enable

func (*MysqlCDC) Process

func (m *MysqlCDC) Process(action string, params ...interface{}) interface{}

Process action

func (*MysqlCDC) Start

func (m *MysqlCDC) Start(i ...interface{}) (err error)

Start source

func (*MysqlCDC) Stop

func (m *MysqlCDC) Stop() error

Stop source

type MysqlCDCConfig

type MysqlCDCConfig struct {
	Host         string                 `json:"host"`
	Port         int                    `json:"port"`
	User         string                 `json:"user"`
	Password     string                 `json:"password"`
	SlaveID      int                    `json:"slave_id" mapstructure:"slave_id"`
	Offset       string                 `json:"offset"`
	LogFile      string                 `json:"logfile"`
	OldValue     bool                   `json:"old_value" mapstructure:"old_value"`
	FilterPolicy string                 `json:"filter_policy" mapstructure:"filter_policy"`
	Filter       map[string]interface{} `json:"filter"`
	Enabled      bool                   `json:"enabled"`
}

MysqlCDCConfig representation of Mysql change data capture configuration

type MysqlQueryConfig

type MysqlQueryConfig struct {
	*JDBCQueryConfig
	Schema  string   `json:"schema"`
	Exclude []string `json:"exclude"`
}

MysqlQueryConfig representation MySQL Query configuration

type Oldkeys

type Oldkeys struct {
	Keynames  []string      `json:"keynames"`
	Keytypes  []string      `json:"keytypes"`
	Keyvalues []interface{} `json:"keyvalues"`
}

Oldkeys representation of old event

type PostgreSQLCDC

type PostgreSQLCDC struct {
	*Source
	// contains filtered or unexported fields
}

PostgreSQLCDC representation of PostgreSQL change data capture

func (*PostgreSQLCDC) GetAvailableActions

func (p *PostgreSQLCDC) GetAvailableActions() map[string]*control.ActionDescription

GetAvailableActions returns available actions

func (*PostgreSQLCDC) GetMeta

func (p *PostgreSQLCDC) GetMeta() map[string]interface{}

GetMeta get metadata

func (*PostgreSQLCDC) GetName

func (p *PostgreSQLCDC) GetName() string

GetName get source name

func (*PostgreSQLCDC) GetOutputChan

func (p *PostgreSQLCDC) GetOutputChan() chan *events.LookatchEvent

GetOutputChan get output channel

func (*PostgreSQLCDC) GetSchema

func (p *PostgreSQLCDC) GetSchema() interface{}

GetSchema get schema

func (*PostgreSQLCDC) GetStatus

func (p *PostgreSQLCDC) GetStatus() interface{}

GetStatus get status

func (*PostgreSQLCDC) HealthCheck

func (p *PostgreSQLCDC) HealthCheck() bool

HealthCheck returns true if ok

func (*PostgreSQLCDC) Init

func (p *PostgreSQLCDC) Init()

Init source

func (*PostgreSQLCDC) IsEnable

func (p *PostgreSQLCDC) IsEnable() bool

IsEnable check if source is enable

func (*PostgreSQLCDC) NewReplicator

func (p *PostgreSQLCDC) NewReplicator() (*pgx.ReplicationConn, error)

NewReplicator create new pg logical decoding connection

func (*PostgreSQLCDC) Process

func (p *PostgreSQLCDC) Process(action string, params ...interface{}) interface{}

Process action

func (*PostgreSQLCDC) Start

func (p *PostgreSQLCDC) Start(i ...interface{}) (err error)

Start source

func (*PostgreSQLCDC) StartReplication added in v0.1.0

func (p *PostgreSQLCDC) StartReplication()

StartReplication Start Replication

func (*PostgreSQLCDC) Stop

func (p *PostgreSQLCDC) Stop() error

Stop source

type PostgreSQLCDCConf

type PostgreSQLCDCConf struct {
	Host         string                 `json:"host"`
	Port         int                    `json:"port"`
	User         string                 `json:"user"`
	Password     string                 `json:"password"`
	SslMode      string                 `json:"sslmode"`
	Database     string                 `json:"database"`
	Offset       string                 `json:"offset"`
	SlotName     string                 `json:"slot_name" mapstructure:"slot_name"`
	OldValue     bool                   `json:"old_value" mapstructure:"old_value"`
	FilterPolicy string                 `json:"filter_policy" mapstructure:"filter_policy"`
	Filter       map[string]interface{} `json:"filter"`
	Enabled      bool                   `json:"enabled"`
}

PostgreSQLCDCConf representation of PostgreSQL change data capture configuration

type PostgreSQLQuery

type PostgreSQLQuery struct {
	*JDBCQuery
	// contains filtered or unexported fields
}

PostgreSQLQuery representation of PostgreSQL Query source

func (*PostgreSQLQuery) Connect

func (p *PostgreSQLQuery) Connect()

Connect connection to database

func (*PostgreSQLQuery) GetStatus

func (p *PostgreSQLQuery) GetStatus() interface{}

GetStatus returns current status of connexion

func (*PostgreSQLQuery) HealthCheck

func (p *PostgreSQLQuery) HealthCheck() bool

HealthCheck returns true if source is ok

func (*PostgreSQLQuery) Init

func (p *PostgreSQLQuery) Init()

Init initialisation of PostgreSQL Query source

func (*PostgreSQLQuery) Process

func (p *PostgreSQLQuery) Process(action string, params ...interface{}) interface{}

Process process an action

func (*PostgreSQLQuery) Query

func (p *PostgreSQLQuery) Query(query string)

Query execute query string

func (*PostgreSQLQuery) QueryMeta

func (p *PostgreSQLQuery) QueryMeta(query string) []map[string]interface{}

QueryMeta execute query meta string

func (*PostgreSQLQuery) QuerySchema

func (p *PostgreSQLQuery) QuerySchema() (err error)

QuerySchema extract schema from database

type PostgreSQLQueryConfig

type PostgreSQLQueryConfig struct {
	*JDBCQueryConfig
	SslMode  string `json:"sslmode"`
	Database string `json:"database"`
}

PostgreSQLQueryConfig representation PostgreSQL Query configuration

type Query

type Query struct {
	Query string `description:"SQL query to execute on agent" required:"true"`
}

Query representation of query action

type Random

type Random struct {
	*Source

	NbMessages int
	// contains filtered or unexported fields
}

Random representation of Random

func (*Random) GetAvailableActions

func (r *Random) GetAvailableActions() map[string]*control.ActionDescription

GetAvailableActions returns available actions

func (*Random) GetMeta

func (r *Random) GetMeta() map[string]interface{}

GetMeta get source meta

func (*Random) GetName

func (r *Random) GetName() string

GetName get source name

func (*Random) GetOutputChan

func (r *Random) GetOutputChan() chan *events.LookatchEvent

GetOutputChan get output channel

func (*Random) GetSchema

func (r *Random) GetSchema() interface{}

GetSchema Get source Schema

func (*Random) GetStatus

func (r *Random) GetStatus() interface{}

GetStatus Get source status

func (*Random) HealthCheck

func (r *Random) HealthCheck() bool

HealthCheck return true if ok

func (*Random) Init

func (r *Random) Init()

Init source

func (*Random) IsEnable

func (r *Random) IsEnable() bool

IsEnable check if source is enable

func (*Random) Process

func (r *Random) Process(action string, params ...interface{}) interface{}

Process action

func (*Random) Start

func (r *Random) Start(i ...interface{}) error

Start source

func (*Random) Stop

func (r *Random) Stop() error

Stop source

type RandomConfig

type RandomConfig struct {
	Wait string `json:"wait"`
}

RandomConfig representation of Random Config

type SQLSchema added in v0.1.0

type SQLSchema map[string]map[string]map[string]*ColumnSchema

SQLSchema schema table Position

type Source

type Source struct {
	Name          string
	OutputChannel chan *events.LookatchEvent
	AgentInfo     *AgentHeader
	IsEnable      bool
	Conf          *viper.Viper
	Offset        int64
}

Source representation of source

type SourceI

type SourceI interface {
	Init()
	Stop() error
	Start(...interface{}) error
	GetName() string
	GetOutputChan() chan *events.LookatchEvent
	GetMeta() map[string]interface{}
	GetSchema() interface{}
	GetStatus() interface{}
	IsEnable() bool
	HealthCheck() bool
	GetAvailableActions() map[string]*control.ActionDescription
	Process(string, ...interface{}) interface{}
}

SourceI interface

func New

func New(name string, sourceType string, config *viper.Viper, eventChan chan *events.LookatchEvent) (s SourceI, err error)

New create new source

type Syslog added in v0.1.0

type Syslog struct {
	*Source

	NbMessages int
	// contains filtered or unexported fields
}

Syslog representation of Random

func (*Syslog) GetAvailableActions added in v0.1.0

func (s *Syslog) GetAvailableActions() map[string]*control.ActionDescription

GetAvailableActions returns available actions

func (*Syslog) GetMeta added in v0.1.0

func (s *Syslog) GetMeta() map[string]interface{}

GetMeta returns source meta

func (*Syslog) GetName added in v0.1.0

func (s *Syslog) GetName() string

GetName get source name

func (*Syslog) GetOutputChan added in v0.1.0

func (s *Syslog) GetOutputChan() chan *events.LookatchEvent

GetOutputChan get output channel

func (*Syslog) GetSchema added in v0.1.0

func (s *Syslog) GetSchema() interface{}

GetSchema returns schema

func (*Syslog) GetStatus added in v0.1.0

func (s *Syslog) GetStatus() interface{}

GetStatus get source status

func (*Syslog) HealthCheck added in v0.1.0

func (s *Syslog) HealthCheck() bool

HealthCheck return true if ok

func (*Syslog) Init added in v0.1.0

func (s *Syslog) Init()

Init syslog source

func (*Syslog) IsEnable added in v0.1.0

func (s *Syslog) IsEnable() bool

IsEnable check if source is enable

func (*Syslog) Process added in v0.1.0

func (s *Syslog) Process(action string, params ...interface{}) interface{}

Process action

func (*Syslog) Start added in v0.1.0

func (s *Syslog) Start(i ...interface{}) error

Start syslog source

func (*Syslog) Stop added in v0.1.0

func (s *Syslog) Stop() error

Stop syslog source

type SyslogConfig added in v0.1.0

type SyslogConfig struct {
	Type string `json:"Type"`
	Port int    `json:"Port"`
}

SyslogConfig representation of Random Config

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL