Versions in this module Expand all Collapse all v1 v1.0.0 Sep 26, 2021 Changes in this version + type BigQueryConfig struct + DatasetID string + JsonPemPath string + ProjectID string + type BigQueryReader struct + AggregateResults bool + ConcurrencyLevel int + PageSize int + TmpTableName string + UnflattenResults bool + func NewBigQueryReader(config *BigQueryConfig, query string) *BigQueryReader + func NewDynamicBigQueryReader(config *BigQueryConfig, sqlGenerator func(data.JSON) (string, error)) *BigQueryReader + func (r *BigQueryReader) Concurrency() int + func (r *BigQueryReader) Finish(outputChan chan data.JSON, killChan chan error) + func (r *BigQueryReader) ForEachQueryData(d data.JSON, killChan chan error, forEach func(d data.JSON)) + func (r *BigQueryReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (r *BigQueryReader) String() string + type BigQueryWriter struct + ConcurrencyLevel int + func NewBigQueryWriter(config *BigQueryConfig, tableName string) *BigQueryWriter + func NewBigQueryWriterForNewTable(config *BigQueryConfig, tableName string, fields map[string]string) *BigQueryWriter + func (w *BigQueryWriter) Concurrency() int + func (w *BigQueryWriter) Finish(outputChan chan data.JSON, killChan chan error) + func (w *BigQueryWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (w *BigQueryWriter) String() string + func (w *BigQueryWriter) WriteBatch(queuedRows []map[string]interface{}) (err error) + type CSVTransformer struct + Parameters util.CSVParameters + func NewCSVTransformer() *CSVTransformer + func (w *CSVTransformer) Finish(outputChan chan data.JSON, killChan chan error) + func (w *CSVTransformer) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (w *CSVTransformer) String() string + type CSVWriter struct + Parameters util.CSVParameters + func NewCSVWriter(w io.Writer) *CSVWriter + func (w *CSVWriter) Finish(outputChan chan data.JSON, killChan chan error) + func (w *CSVWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (w *CSVWriter) String() string + type FileReader struct + func NewFileReader(filename string) *FileReader + func (r *FileReader) Finish(outputChan chan data.JSON, killChan chan error) + func (r *FileReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (r *FileReader) String() string + type FtpWriter struct + func NewFtpWriter(host, username, password, path string) *FtpWriter + func (f *FtpWriter) Finish(outputChan chan data.JSON, killChan chan error) + func (f *FtpWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (f *FtpWriter) String() string + type FuncTransformer struct + ConcurrencyLevel int + Name string + func NewFuncTransformer(transform func(d data.JSON) data.JSON) *FuncTransformer + func (t *FuncTransformer) Concurrency() int + func (t *FuncTransformer) Finish(outputChan chan data.JSON, killChan chan error) + func (t *FuncTransformer) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (t *FuncTransformer) String() string + type HTTPRequest struct + Client *http.Client + Request *http.Request + func NewHTTPRequest(method, url string, body io.Reader) (*HTTPRequest, error) + func (r *HTTPRequest) Finish(outputChan chan data.JSON, killChan chan error) + func (r *HTTPRequest) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (r *HTTPRequest) String() string + type IoReader struct + BufferSize int + Gzipped bool + LineByLine bool + Reader io.Reader + func NewIoReader(reader io.Reader) *IoReader + func (r *IoReader) Finish(outputChan chan data.JSON, killChan chan error) + func (r *IoReader) ForEachData(killChan chan error, foo func(d data.JSON)) + func (r *IoReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (r *IoReader) String() string + type IoReaderWriter struct + func NewIoReaderWriter(reader io.Reader, writer io.Writer) *IoReaderWriter + func (r *IoReaderWriter) Finish(outputChan chan data.JSON, killChan chan error) + func (r *IoReaderWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (r *IoReaderWriter) String() string + type IoWriter struct + AddNewline bool + Writer io.Writer + func NewIoWriter(writer io.Writer) *IoWriter + func (w *IoWriter) Finish(outputChan chan data.JSON, killChan chan error) + func (w *IoWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (w *IoWriter) String() string + type Passthrough struct + func NewPassthrough() *Passthrough + func (r *Passthrough) Finish(outputChan chan data.JSON, killChan chan error) + func (r *Passthrough) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (r *Passthrough) String() string + type RegexpMatcher struct + DebugLog bool + func NewRegexpMatcher(pattern string) *RegexpMatcher + func (r *RegexpMatcher) Finish(outputChan chan data.JSON, killChan chan error) + func (r *RegexpMatcher) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (r *RegexpMatcher) String() string + type S3Reader struct + DeleteObjects bool + func NewS3ObjectReader(awsID, awsSecret, awsRegion, bucket, object string) *S3Reader + func NewS3PrefixReader(awsID, awsSecret, awsRegion, bucket, prefix string) *S3Reader + func (r *S3Reader) Finish(outputChan chan data.JSON, killChan chan error) + func (r *S3Reader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (r *S3Reader) String() string + type S3Writer struct + Compress bool + LineSeparator string + func NewS3Writer(awsID, awsSecret, awsRegion, bucket, key string) *S3Writer + func (w *S3Writer) Finish(outputChan chan data.JSON, killChan chan error) + func (w *S3Writer) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (w *S3Writer) String() string + type SCP struct + Destination string + Object string + Port string + func NewSCP(obj string, destination string) *SCP + func (s *SCP) Finish(outputChan chan data.JSON, killChan chan error) + func (s *SCP) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (s *SCP) Run(killChan chan error) + type SQLExecutor struct + func NewDynamicSQLExecutor(dbConn *sql.DB, sqlGenerator func(data.JSON) (string, error)) *SQLExecutor + func NewSQLExecutor(dbConn *sql.DB, sql string) *SQLExecutor + func (s *SQLExecutor) Finish(outputChan chan data.JSON, killChan chan error) + func (s *SQLExecutor) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (s *SQLExecutor) String() string + type SQLReader struct + BatchSize int + ConcurrencyLevel int + StructDestination interface{} + func NewDynamicSQLReader(dbConn *sql.DB, sqlGenerator func(data.JSON) (string, error)) *SQLReader + func NewSQLReader(dbConn *sql.DB, sql string) *SQLReader + func (s *SQLReader) Concurrency() int + func (s *SQLReader) Finish(outputChan chan data.JSON, killChan chan error) + func (s *SQLReader) ForEachQueryData(d data.JSON, killChan chan error, forEach func(d data.JSON)) + func (s *SQLReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (s *SQLReader) String() string + type SQLReaderWriter struct + ConcurrencyLevel int + func NewDynamicSQLReaderWriter(readConn *sql.DB, writeConn *sql.DB, ...) *SQLReaderWriter + func NewSQLReaderWriter(readConn *sql.DB, writeConn *sql.DB, readQuery, writeTable string) *SQLReaderWriter + func (s *SQLReaderWriter) Concurrency() int + func (s *SQLReaderWriter) Finish(outputChan chan data.JSON, killChan chan error) + func (s *SQLReaderWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (s *SQLReaderWriter) String() string + type SQLWriter struct + BatchSize int + ConcurrencyLevel int + OnDupKeyFields []string + OnDupKeyUpdate bool + TableName string + func NewSQLWriter(db *sql.DB, tableName string) *SQLWriter + func (s *SQLWriter) Concurrency() int + func (s *SQLWriter) Finish(outputChan chan data.JSON, killChan chan error) + func (s *SQLWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (s *SQLWriter) String() string + type SQLWriterData struct + InsertData interface{} + TableName string + type SftpReader struct + CloseOnFinish bool + DeleteObjects bool + FileNamesOnly bool + Walk bool + func NewSftpReader(server string, username string, path string, authMethods ...ssh.AuthMethod) *SftpReader + func NewSftpReaderByClient(client *sftp.Client, path string) *SftpReader + func (r *SftpReader) CloseClient() + func (r *SftpReader) Finish(outputChan chan data.JSON, killChan chan error) + func (r *SftpReader) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (r *SftpReader) String() string + type SftpWriter struct + CloseOnFinish bool + func NewSftpWriter(server string, username string, path string, authMethods ...ssh.AuthMethod) *SftpWriter + func NewSftpWriterByFile(file *sftp.File) *SftpWriter + func (w *SftpWriter) Finish(outputChan chan data.JSON, killChan chan error) + func (w *SftpWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) + func (w *SftpWriter) String() string