storages

package
v0.0.0-...-8aeb8a1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2023 License: MIT Imports: 44 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SourceSuccessEventType    = "SOURCE_SUCCESSFUL_RUN"
	DestinationBatchEventType = "DESTINATION_BATCH_RUN"
)
View Source
const (

	//BatchMode is a mode when destinations store data with batches
	BatchMode = "batch"
	//StreamMode is a mode when destinations store data row by row
	StreamMode = "stream"
	//SynchronousMode is a mode when destinations process event immediately during HTTP request lifetime and can put result in HTTP response body
	SynchronousMode = "synchronous"
)
View Source
const (
	RedshiftType        = "redshift"
	BigQueryType        = "bigquery"
	PostgresType        = "postgres"
	MySQLType           = "mysql"
	ClickHouseType      = "clickhouse"
	S3Type              = "s3"
	SnowflakeType       = "snowflake"
	GoogleAnalyticsType = "google_analytics"
	GCSType             = "gcs"
	FacebookType        = "facebook"
	WebHookType         = "webhook"
	NpmType             = "npm"
	TagType             = "tag"
	AmplitudeType       = "amplitude"
	HubSpotType         = "hubspot"
	DbtCloudType        = "dbtcloud"
)

Variables

View Source
var (
	//ErrUnknownDestination error for checking unknown destination type
	ErrUnknownDestination = errors.New("Unknown destination type")
	//StorageTypes is used in all destinations init() methods
	StorageTypes = make(map[string]StorageType)
)
View Source
var DefaultHTTPConfiguration = &adapters.HTTPConfiguration{
	GlobalClientTimeout:       10 * time.Second,
	RetryDelay:                10 * time.Second,
	RetryCount:                9,
	ClientMaxIdleConns:        1000,
	ClientMaxIdleConnsPerHost: 1000,
	QueueFullnessThreshold:    100_000,
}

DefaultHTTPConfiguration contains default HTTP timeouts/retry/delays,etc for HTTPAdapters

View Source
var (
	UserRecognitionStorages = map[string]URSetup{
		MySQLType:      {true},
		PostgresType:   {true},
		RedshiftType:   {true},
		SnowflakeType:  {true},
		ClickHouseType: {false},
	}
)

Functions

func CreateMySQLAdapter

func CreateMySQLAdapter(ctx context.Context, config adapters.DataSourceConfig, queryLogger *logging.QueryLogger, sqlTypes typing.SQLTypes) (*adapters.MySQL, error)

CreateMySQLAdapter creates mysql adapter with database if database doesn't exist - mysql returns error. In this case connect without database and create it

func CreateSnowflakeAdapter

func CreateSnowflakeAdapter(ctx context.Context, s3Config *adapters.S3Config, config adapters.SnowflakeConfig,
	queryLogger *logging.QueryLogger, sqlTypes typing.SQLTypes) (*adapters.Snowflake, error)

CreateSnowflakeAdapter creates snowflake adapter with schema if schema doesn't exist - snowflake returns error. In this case connect without schema and create it

func IsConnectionError

func IsConnectionError(err error) bool

func RegisterFileStorage

func RegisterFileStorage(storageType string, createStorage CreateStorage, extractConfig ExtractConfig)

func RegisterStorage

func RegisterStorage(storageType StorageType)

RegisterStorage registers function to create new storage(destination) instance

Types

type Abstract

type Abstract struct {
	// contains filtered or unexported fields
}

Abstract is an Abstract destination storage contains common destination funcs aka abstract class

func (*Abstract) AccountResult

func (a *Abstract) AccountResult(eventContext *adapters.EventContext, err error)

AccountResult checks input error and calls ErrorEvent or SuccessEvent

func (*Abstract) Clean

func (a *Abstract) Clean(tableName string) error

Clean removes all records from storage

func (*Abstract) DropTable

func (a *Abstract) DropTable(tableName string) error

func (*Abstract) DryRun

func (a *Abstract) DryRun(payload events.Event) ([][]adapters.TableField, error)

func (*Abstract) ErrorEvent

func (a *Abstract) ErrorEvent(fallback bool, eventCtx *adapters.EventContext, err error)

ErrorEvent writes error to metrics/counters/telemetry/events cache

func (*Abstract) Fallback

func (a *Abstract) Fallback(failedEvents ...*events.FailedEvent)

Fallback logs event with error to fallback logger

func (*Abstract) GetSyncWorker

func (a *Abstract) GetSyncWorker() *SyncWorker

func (*Abstract) GetUniqueIDField

func (a *Abstract) GetUniqueIDField() *identifiers.UniqueID

GetUniqueIDField returns unique ID field configuration

func (*Abstract) ID

func (a *Abstract) ID() string

ID returns destination ID

func (*Abstract) Init

func (a *Abstract) Init(config *Config, impl Storage, preinstalledJavaScript string, defaultUserTransform string) error

func (*Abstract) Insert

func (a *Abstract) Insert(eventContext *adapters.EventContext) (insertErr error)

Insert ensures table and sends input event to Destination (with 1 retry if error)

func (*Abstract) IsCachingDisabled

func (a *Abstract) IsCachingDisabled() bool

IsCachingDisabled returns true if caching is disabled in destination configuration

func (*Abstract) IsStaging

func (a *Abstract) IsStaging() bool

func (*Abstract) Processor

func (a *Abstract) Processor() *schema.Processor

Processor returns processor

func (*Abstract) ReplaceTable

func (a *Abstract) ReplaceTable(originalTable, replacementTable string, dropOldTable bool) error

func (*Abstract) SkipEvent

func (a *Abstract) SkipEvent(eventCtx *adapters.EventContext, err error)

SkipEvent writes skip to metrics/counters/telemetry and error to events cache

func (*Abstract) Start

func (a *Abstract) Start(config *Config) error

func (*Abstract) Store

func (a *Abstract) Store(fileName string, objects []map[string]interface{}, alreadyUploadedTables map[string]bool, needCopyEvent bool) (map[string]*StoreResult, *events.FailedEvents, *events.SkippedEvents, error)

Store process events and stores with StoreTable() func returns store result per table, failed events (group of events which are failed to process) and err

func (*Abstract) SuccessEvent

func (a *Abstract) SuccessEvent(eventCtx *adapters.EventContext)

SuccessEvent writes success to metrics/counters/telemetry/events cache

func (*Abstract) Update

func (a *Abstract) Update(eventContext *adapters.EventContext) error

Update updates record

type Amplitude

type Amplitude struct {
	HTTPStorage
}

Amplitude is a destination that can send data into Amplitude

func (*Amplitude) Type

func (a *Amplitude) Type() string

Type returns Amplitude type

type AwsRedshift

type AwsRedshift struct {
	Abstract
	// contains filtered or unexported fields
}

AwsRedshift stores files to aws RedShift in two modes: batch: via aws s3 in batch mode (1 file = 1 statement) stream: via events queue in stream mode (1 object = 1 statement)

func (*AwsRedshift) Clean

func (ar *AwsRedshift) Clean(tableName string) error

func (*AwsRedshift) Close

func (ar *AwsRedshift) Close() (multiErr error)

Close closes AwsRedshift adapter, fallback logger and streaming worker

func (*AwsRedshift) GetUsersRecognition

func (ar *AwsRedshift) GetUsersRecognition() *UserRecognitionConfiguration

GetUsersRecognition returns users recognition configuration

func (*AwsRedshift) SyncStore

func (ar *AwsRedshift) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error

SyncStore is used in storing chunk of pulled data to AwsRedshift with processing

func (*AwsRedshift) Type

func (ar *AwsRedshift) Type() string

Type returns Redshift type

type BigQuery

type BigQuery struct {
	Abstract
	// contains filtered or unexported fields
}

BigQuery stores files to google BigQuery in two modes: batch: via google cloud storage in batch mode (1 file = 1 operation) stream: via events queue in stream mode (1 object = 1 operation)

func (*BigQuery) Clean

func (bq *BigQuery) Clean(tableName string) error

func (*BigQuery) Close

func (bq *BigQuery) Close() (multiErr error)

Close closes BigQuery adapter, fallback logger and streaming worker

func (*BigQuery) GetUsersRecognition

func (bq *BigQuery) GetUsersRecognition() *UserRecognitionConfiguration

GetUsersRecognition returns disabled users recognition configuration

func (*BigQuery) SyncStore

func (bq *BigQuery) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error

SyncStore is used in storing chunk of pulled data to BigQuery with processing

func (*BigQuery) Type

func (bq *BigQuery) Type() string

Type returns BigQuery type

func (*BigQuery) Update

func (bq *BigQuery) Update(eventContext *adapters.EventContext) error

Update isn't supported

type ClickHouse

type ClickHouse struct {
	Abstract
	// contains filtered or unexported fields
}

ClickHouse stores files to ClickHouse in two modes: batch: (1 file = 1 statement) stream: (1 object = 1 statement)

func (*ClickHouse) Clean

func (ch *ClickHouse) Clean(tableName string) error

func (*ClickHouse) Close

func (ch *ClickHouse) Close() (multiErr error)

Close closes ClickHouse adapters, fallback logger and streaming worker

func (*ClickHouse) GetUsersRecognition

func (ch *ClickHouse) GetUsersRecognition() *UserRecognitionConfiguration

GetUsersRecognition returns users recognition configuration

func (*ClickHouse) SyncStore

func (ch *ClickHouse) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error

SyncStore is used in storing chunk of pulled data to ClickHouse with processing

func (*ClickHouse) Type

func (ch *ClickHouse) Type() string

Type returns ClickHouse type

type Config

type Config struct {
	PostHandleDestinations []string
	// contains filtered or unexported fields
}

Config is a model for passing to destinations creator funcs

type CreateStorage

type CreateStorage = func(config *Config) (Storage, error)

type DbtCloud

type DbtCloud struct {
	HTTPStorage
	// contains filtered or unexported fields
}

DbtCloud is a destination that can send API request to cloud.getdbt.com It is not general purpose destination. It is designed for special kind of events like successful run of Source

func (*DbtCloud) Enabled

func (dbt *DbtCloud) Enabled() bool

Enabled returns whether we should use this storage

func (*DbtCloud) Type

func (dbt *DbtCloud) Type() string

Type returns WebHook type

type ExtractConfig

type ExtractConfig = func(config *config.DestinationConfig) map[string]interface{}

type Facebook

type Facebook struct {
	HTTPStorage
}

Facebook stores events to Facebook Conversion API in stream mode

func (*Facebook) Type

func (fb *Facebook) Type() string

Type returns Facebook type

type Factory

type Factory interface {
	Create(name string, destination config.DestinationConfig) (StorageProxy, events.Queue, error)
	Configure(destinationID string, destination config.DestinationConfig) (func(config *Config) (Storage, error), *Config, error)
}

Factory is a destinations factory for creation

func NewFactory

func NewFactory(ctx context.Context, logEventPath string, geoService *geo.Service, coordinationService *coordination.Service,
	eventsCache *caching.EventsCache, globalLoggerFactory *logevents.Factory, globalConfiguration *config.UsersRecognition,
	metaStorage meta.Storage, eventsQueueFactory *events.QueueFactory, maxColumns int, streamingThreadsCount int) Factory

NewFactory returns configured Factory

func NewMockFactory

func NewMockFactory() Factory

NewMockFactory returns new MockFactory

type FactoryImpl

type FactoryImpl struct {
	// contains filtered or unexported fields
}

FactoryImpl is a destination's factory implementation

func (*FactoryImpl) Configure

func (f *FactoryImpl) Configure(destinationID string, destination config.DestinationConfig) (func(config *Config) (Storage, error), *Config, error)

func (*FactoryImpl) Create

func (f *FactoryImpl) Create(destinationID string, destination config.DestinationConfig) (StorageProxy, events.Queue, error)

Create builds event storage proxy and event consumer (logger or event-queue) Enriches incoming configs with default values if needed

type FileAdapter

type FileAdapter interface {
	io.Closer
	UploadBytes(fileName string, fileBytes []byte) error
	Compression() adapters.FileCompression
	Format() adapters.FileEncodingFormat
}

type FileStorage

type FileStorage struct {
	Abstract
	// contains filtered or unexported fields
}

func (*FileStorage) Close

func (fs *FileStorage) Close() (multiErr error)

Close closes fallback logger

func (*FileStorage) DryRun

func (fs *FileStorage) DryRun(events.Event) ([][]adapters.TableField, error)

func (*FileStorage) GetUsersRecognition

func (fs *FileStorage) GetUsersRecognition() *UserRecognitionConfiguration

GetUsersRecognition returns disabled users recognition configuration

func (*FileStorage) Store

func (fs *FileStorage) Store(fileName string, objects []map[string]interface{}, alreadyUploadedTables map[string]bool, needCopyEvent bool) (map[string]*StoreResult, *events.FailedEvents, *events.SkippedEvents, error)

Store process events and stores with storeTable() func returns store result per table, failed events (group of events which are failed to process) and err

func (*FileStorage) SyncStore

func (fs *FileStorage) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error

SyncStore isn't supported

func (*FileStorage) Type

func (fs *FileStorage) Type() string

Type returns file storage type

func (*FileStorage) Update

func (fs *FileStorage) Update(map[string]interface{}) error

Update isn't supported

type GoogleAnalytics

type GoogleAnalytics struct {
	HTTPStorage
}

GoogleAnalytics stores events to Google Analytics in stream mode

func (*GoogleAnalytics) Type

func (ga *GoogleAnalytics) Type() string

Type returns Google Analytics type

type HTTPStorage

type HTTPStorage struct {
	Abstract
	// contains filtered or unexported fields
}

HTTPStorage is an abstract destination storage for HTTP destinations contains common HTTP destination funcs aka abstract class

func (*HTTPStorage) Close

func (h *HTTPStorage) Close() (multiErr error)

Close closes adapter, fallback logger and streaming worker

func (*HTTPStorage) DryRun

func (h *HTTPStorage) DryRun(payload events.Event) ([][]adapters.TableField, error)

func (*HTTPStorage) GetUniqueIDField

func (h *HTTPStorage) GetUniqueIDField() *identifiers.UniqueID

GetUniqueIDField returns unique ID field configuration

func (*HTTPStorage) GetUsersRecognition

func (h *HTTPStorage) GetUsersRecognition() *UserRecognitionConfiguration

GetUsersRecognition returns disabled users recognition configuration

func (*HTTPStorage) Insert

func (h *HTTPStorage) Insert(eventContext *adapters.EventContext) error

Insert sends event into adapters.Adapter

func (*HTTPStorage) IsCachingDisabled

func (h *HTTPStorage) IsCachingDisabled() bool

IsCachingDisabled returns true if caching is disabled in destination configuration

func (*HTTPStorage) Store

func (h *HTTPStorage) Store(fileName string, objects []map[string]interface{}, alreadyUploadedTables map[string]bool, needCopyEvent bool) (map[string]*StoreResult, *events.FailedEvents, *events.SkippedEvents, error)

Store isn't supported

func (*HTTPStorage) SyncStore

func (h *HTTPStorage) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error

SyncStore isn't supported

func (*HTTPStorage) Type

func (h *HTTPStorage) Type() string

Type returns storage type. Should be overridden in every implementation

func (*HTTPStorage) Update

func (h *HTTPStorage) Update(eventContext *adapters.EventContext) error

Update isn't supported

type HubSpot

type HubSpot struct {
	HTTPStorage
}

HubSpot is a destination that can send data into HubSpot

func (*HubSpot) Type

func (h *HubSpot) Type() string

Type returns HubSpot type

type MockFactory

type MockFactory struct{}

MockFactory is a Mock destinations storages factory

func (*MockFactory) Configure

func (mf *MockFactory) Configure(_ string, _ config.DestinationConfig) (func(config *Config) (Storage, error), *Config, error)

func (*MockFactory) Create

func (mf *MockFactory) Create(id string, destination config.DestinationConfig) (StorageProxy, events.Queue, error)

Create returns proxy Mock and events queue

type MySQL

type MySQL struct {
	Abstract
	// contains filtered or unexported fields
}

MySQL stores files to MySQL in two modes: batch: (1 file = 1 statement) stream: (1 object = 1 statement)

func (*MySQL) Clean

func (m *MySQL) Clean(tableName string) error

func (*MySQL) Close

func (m *MySQL) Close() (multiErr error)

Close closes MySQL adapter, fallback logger and streaming worker

func (*MySQL) DryRun

func (m *MySQL) DryRun(payload events.Event) ([][]adapters.TableField, error)

func (*MySQL) GetUsersRecognition

func (m *MySQL) GetUsersRecognition() *UserRecognitionConfiguration

GetUsersRecognition returns users recognition configuration

func (*MySQL) SyncStore

func (m *MySQL) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error

SyncStore is used in storing chunk of pulled data to Postgres with processing

func (*MySQL) Type

func (m *MySQL) Type() string

Type returns MySQL type

type NpmDestination

type NpmDestination struct {
	WebHook
}

func (*NpmDestination) Type

func (wh *NpmDestination) Type() string

Type returns NpmType type

type NpmValidatorResult

type NpmValidatorResult struct {
	Ok      bool   `mapstructure:"ok"`
	Message string `mapstructure:"message"`
}

type Postgres

type Postgres struct {
	Abstract
	// contains filtered or unexported fields
}

Postgres stores files to Postgres in two modes: batch: (1 file = 1 statement) stream: (1 object = 1 statement)

func (*Postgres) Clean

func (p *Postgres) Clean(tableName string) error

func (*Postgres) Close

func (p *Postgres) Close() (multiErr error)

Close closes Postgres adapter, fallback logger and streaming worker

func (*Postgres) GetUsersRecognition

func (p *Postgres) GetUsersRecognition() *UserRecognitionConfiguration

GetUsersRecognition returns users recognition configuration

func (*Postgres) SyncStore

func (p *Postgres) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error

SyncStore is used in storing chunk of pulled data to Postgres with processing

func (*Postgres) Type

func (p *Postgres) Type() string

Type returns Facebook type

type RetryableProxy

type RetryableProxy struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

RetryableProxy creates Storage with retry (if create fails e.g. because of connection issue)

func (*RetryableProxy) Close

func (rsp *RetryableProxy) Close() error

Close stops underlying goroutine and close the storage

func (*RetryableProxy) Get

func (rsp *RetryableProxy) Get() (Storage, bool)

Get returns underlying destination storage and ready flag

func (*RetryableProxy) GetGeoResolverID

func (rsp *RetryableProxy) GetGeoResolverID() string

func (*RetryableProxy) GetPostHandleDestinations

func (rsp *RetryableProxy) GetPostHandleDestinations() []string

func (*RetryableProxy) GetUniqueIDField

func (rsp *RetryableProxy) GetUniqueIDField() *identifiers.UniqueID

GetUniqueIDField returns unique ID field configuration

func (*RetryableProxy) ID

func (rsp *RetryableProxy) ID() string

ID returns destination ID

func (*RetryableProxy) IsCachingDisabled

func (rsp *RetryableProxy) IsCachingDisabled() bool

IsCachingDisabled returns true if caching is disabled in destination configuration

func (*RetryableProxy) Mode

func (rsp *RetryableProxy) Mode() string

Mode returns destination mode

func (*RetryableProxy) Type

func (rsp *RetryableProxy) Type() string

Type returns destination type

type Snowflake

type Snowflake struct {
	Abstract
	// contains filtered or unexported fields
}

Snowflake stores files to Snowflake in two modes: batch: via aws s3 (or gcp) in batch mode (1 file = 1 transaction) stream: via events queue in stream mode (1 object = 1 transaction)

func (*Snowflake) Clean

func (s *Snowflake) Clean(tableName string) error

func (*Snowflake) Close

func (s *Snowflake) Close() (multiErr error)

Close closes Snowflake adapter, stage adapter, fallback logger and streaming worker

func (*Snowflake) GetUsersRecognition

func (s *Snowflake) GetUsersRecognition() *UserRecognitionConfiguration

GetUsersRecognition returns users recognition configuration

func (*Snowflake) SyncStore

func (s *Snowflake) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error

SyncStore is used in storing chunk of pulled data to Snowflake with processing

func (*Snowflake) Type

func (s *Snowflake) Type() string

Type returns Snowflake type

type Storage

type Storage interface {
	io.Closer
	DryRun(payload events.Event) ([][]adapters.TableField, error)
	Store(fileName string, objects []map[string]interface{}, alreadyUploadedTables map[string]bool, needCopyEvent bool) (map[string]*StoreResult, *events.FailedEvents, *events.SkippedEvents, error)
	SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error

	ReplaceTable(originalTable, replacementTable string, dropOldTable bool) error
	DropTable(tableName string) error

	//Update(object map[string]interface{}) error
	Fallback(events ...*events.FailedEvent)
	GetUsersRecognition() *UserRecognitionConfiguration
	GetSyncWorker() *SyncWorker
	GetUniqueIDField() *identifiers.UniqueID

	Processor() *schema.Processor
	Init(config *Config, impl Storage, preinstalledJavaScript string, defaultUserTransform string) error
	Start(config *Config) error
	ID() string
	Type() string
	IsStaging() bool
	IsCachingDisabled() bool
	Clean(tableName string) error
	// contains filtered or unexported methods
}

Storage is a destination representation

func NewAmplitude

func NewAmplitude(config *Config) (storage Storage, err error)

NewAmplitude returns configured Amplitude destination

func NewAwsRedshift

func NewAwsRedshift(config *Config) (storage Storage, err error)

NewAwsRedshift returns AwsRedshift and start goroutine for aws redshift batch storage or for stream consumer depend on destination mode

func NewBigQuery

func NewBigQuery(config *Config) (storage Storage, err error)

NewBigQuery returns BigQuery configured instance

func NewClickHouse

func NewClickHouse(config *Config) (storage Storage, err error)

NewClickHouse returns configured ClickHouse instance

func NewDbtCloud

func NewDbtCloud(config *Config) (storage Storage, err error)

NewDbtCloud returns configured DbtCloud destination

func NewFacebook

func NewFacebook(config *Config) (storage Storage, err error)

NewFacebook returns configured Facebook destination

func NewGoogleAnalytics

func NewGoogleAnalytics(config *Config) (storage Storage, err error)

NewGoogleAnalytics return GoogleAnalytics instance start streaming worker goroutine

func NewGoogleCloudStorage

func NewGoogleCloudStorage(config *Config) (Storage, error)

func NewHubSpot

func NewHubSpot(config *Config) (storage Storage, err error)

NewHubSpot returns configured HubSpot destination

func NewMySQL

func NewMySQL(config *Config) (storage Storage, err error)

NewMySQL returns configured MySQL Destination

func NewNpmDestination

func NewNpmDestination(config *Config) (storage Storage, err error)

NewNpmDestination returns configured NpmDestination

func NewPostgres

func NewPostgres(config *Config) (storage Storage, err error)

NewPostgres returns configured Postgres Destination

func NewS3

func NewS3(config *Config) (Storage, error)

func NewSnowflake

func NewSnowflake(config *Config) (storage Storage, err error)

NewSnowflake returns Snowflake and start goroutine for Snowflake batch storage or for stream consumer depend on destination mode

func NewTagDestination

func NewTagDestination(config *Config) (storage Storage, err error)

NewTagDestination returns configured TagDestination

func NewWebHook

func NewWebHook(config *Config) (storage Storage, err error)

NewWebHook returns configured WebHook destination

type StorageProxy

type StorageProxy interface {
	io.Closer
	Get() (Storage, bool)
	GetUniqueIDField() *identifiers.UniqueID
	GetPostHandleDestinations() []string
	GetGeoResolverID() string
	IsCachingDisabled() bool
	ID() string
	Type() string
	Mode() string
}

StorageProxy is a storage proxy

type StorageType

type StorageType struct {
	IsSynchronous bool
	// contains filtered or unexported fields
}

type StoreResult

type StoreResult struct {
	Err       error
	RowsCount int
	EventsSrc map[string]int
}

StoreResult is used as a Batch storing result

type StreamingStorage

type StreamingStorage interface {
	Storage
	//Insert uses errCallback in async adapters (e.g. adapters.HTTPAdapter)
	Insert(eventContext *adapters.EventContext) (err error)

	Update(eventContext *adapters.EventContext) (err error)
	//SuccessEvent writes metrics/counters/events cache, etc
	SuccessEvent(eventCtx *adapters.EventContext)
	//ErrorEvent writes metrics/counters/events cache, etc
	ErrorEvent(fallback bool, eventCtx *adapters.EventContext, err error)
	//SkipEvent writes metrics/counters/events cache, etc
	SkipEvent(eventCtx *adapters.EventContext, err error)
}

StreamingStorage supports Insert operation

type StreamingWorker

type StreamingWorker struct {
	// contains filtered or unexported fields
}

StreamingWorker reads events from queue and using events.StreamingStorage writes them

func (*StreamingWorker) Close

func (sw *StreamingWorker) Close() error

type SyncStorage

type SyncStorage interface {
	Storage
	//ProcessEvent process event in sync fashion. Return resulting object immediately
	ProcessEvent(eventContext *adapters.EventContext) (map[string]interface{}, error)
	//SuccessEvent writes metrics/counters/events cache, etc
	SuccessEvent(eventCtx *adapters.EventContext)
	//ErrorEvent writes metrics/counters/events cache, etc
	ErrorEvent(fallback bool, eventCtx *adapters.EventContext, err error)
	//SkipEvent writes metrics/counters/events cache, etc
	SkipEvent(eventCtx *adapters.EventContext, err error)
}

SyncStorage supports ProcessEvent synchronous operation

type SyncWorker

type SyncWorker struct {
	// contains filtered or unexported fields
}

SyncWorker process events synchronously. Allow returning result of processing in the body of http response

func (*SyncWorker) Close

func (sw *SyncWorker) Close() error

func (*SyncWorker) ProcessEvent

func (sw *SyncWorker) ProcessEvent(fact events.Event, tokenID string) []map[string]interface{}

type TableHelper

type TableHelper struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

TableHelper keeps tables schema state inmemory and update it according to incoming new data consider that all tables are in one destination schema. note: Assume that after any outer changes in db we need to increment table version in Service

func NewTableHelper

func NewTableHelper(dbSchema string, sqlAdapter adapters.SQLAdapter, coordinationService *coordination.Service, pkFields map[string]bool,
	columnTypesMapping map[typing.DataType]string, maxColumns int, destinationType string) *TableHelper

NewTableHelper returns configured TableHelper instance Note: columnTypesMapping must be not empty (or fields will be ignored)

func (*TableHelper) EnsureTable

func (th *TableHelper) EnsureTable(destinationID string, dataSchema *adapters.Table, cacheTable bool) (*adapters.Table, error)

EnsureTable returns DB table schema and err if occurred if table doesn't exist - create a new one and increment version if exists - calculate diff, patch existing one with diff and increment version returns actual db table schema (with actual db types)

func (*TableHelper) EnsureTableWithCaching

func (th *TableHelper) EnsureTableWithCaching(destinationID string, dataSchema *adapters.Table) (*adapters.Table, error)

EnsureTableWithCaching calls EnsureTable with cacheTable = true it is used in stream destinations (because we don't have time to select table schema, but there is retry on error)

func (*TableHelper) EnsureTableWithoutCaching

func (th *TableHelper) EnsureTableWithoutCaching(destinationID string, dataSchema *adapters.Table) (*adapters.Table, error)

EnsureTableWithoutCaching calls EnsureTable with cacheTable = true it is used in batch destinations and syncStore (because we have time to select table schema)

func (*TableHelper) MapTableSchema

func (th *TableHelper) MapTableSchema(batchHeader *schema.BatchHeader) *adapters.Table

MapTableSchema maps schema.BatchHeader (JSON structure with json data types) into adapters.Table (structure with SQL types) applies column types mapping

func (*TableHelper) RefreshTableSchema

func (th *TableHelper) RefreshTableSchema(destinationName string, dataSchema *adapters.Table) (*adapters.Table, error)

RefreshTableSchema force get (or create) db table schema and update it in-memory

type TagDestination

type TagDestination struct {
	Abstract
	// contains filtered or unexported fields
}

func (*TagDestination) Close

func (t *TagDestination) Close() error

func (*TagDestination) GetSyncWorker

func (t *TagDestination) GetSyncWorker() *SyncWorker

func (*TagDestination) GetUsersRecognition

func (t *TagDestination) GetUsersRecognition() *UserRecognitionConfiguration

func (*TagDestination) ProcessEvent

func (t *TagDestination) ProcessEvent(eventContext *adapters.EventContext) (map[string]interface{}, error)

func (*TagDestination) SyncStore

func (t *TagDestination) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error

func (*TagDestination) Type

func (t *TagDestination) Type() string

Type returns NpmType type

type URSetup

type URSetup struct {
	PKRequired bool
}

type UserRecognitionConfiguration

type UserRecognitionConfiguration struct {
	AnonymousIDJSONPath      jsonutils.JSONPath
	IdentificationJSONPathes *jsonutils.JSONPaths

	Enabled bool
}

UserRecognitionConfiguration recognition configuration

func (*UserRecognitionConfiguration) IsEnabled

func (urc *UserRecognitionConfiguration) IsEnabled() bool

IsEnabled returns true if not nil and enabled

type WebHook

type WebHook struct {
	HTTPStorage
}

WebHook is a destination that can send configurable HTTP requests

func (*WebHook) Type

func (wh *WebHook) Type() string

Type returns WebHook type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL