ingest

package
v0.0.0-...-3a1fdf7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2022 License: Apache-2.0, Apache-2.0 Imports: 32 Imported by: 0

Documentation

Overview

Package ingest contains the new ingestion system for horizon. It currently runs completely independent of the old one, that means that the new system can be ledgers behind/ahead the old system.

Index

Constants

View Source
const (
	// MaxSupportedProtocolVersion defines the maximum supported version of
	// the Stellar protocol.
	MaxSupportedProtocolVersion uint32 = 19

	// CurrentVersion reflects the latest version of the ingestion
	// algorithm. This value is stored in KV store and is used to decide
	// if there's a need to reprocess the ledger state or reingest data.
	//
	// Version history:
	// - 1: Initial version
	// - 2: Added the orderbook, offers processors and distributed ingestion.
	// - 3: Fixed a bug that could potentially result in invalid state
	//      (#1722). Update the version to clear the state.
	// - 4: Fixed a bug in AccountSignersChanged method.
	// - 5: Added trust lines.
	// - 6: Added accounts and accounts data.
	// - 7: Fixes a bug in AccountSignersChanged method.
	// - 8: Fixes AccountSigners processor to remove preauth tx signer
	//      when preauth tx is failed.
	// - 9: Fixes a bug in asset stats processor that counted unauthorized
	//      trustlines.
	// - 10: Fixes a bug in meta processing (fees are now processed before
	//      everything else).
	// - 11: Protocol 14: CAP-23 and CAP-33.
	// - 12: Trigger state rebuild due to `absTime` -> `abs_time` rename
	//       in ClaimableBalances predicates.
	// - 13: Trigger state rebuild to include more than just authorized assets.
	// - 14: Trigger state rebuild to include claimable balances in the asset stats processor.
	// - 15: Fixed bug in asset stat ingestion where clawback is enabled (#3846).
	// - 16: Extract claimants to a separate table for better performance of
	//       claimable balances for claimant queries.
	CurrentVersion = 16

	// MaxDBConnections is the size of the postgres connection pool dedicated to Horizon ingestion:
	//  * Ledger ingestion,
	//  * State verifications,
	//  * Metrics updates.
	MaxDBConnections = 3
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	CoreSession            db.SessionInterface
	StellarCoreURL         string
	StellarCoreCursor      string
	EnableCaptiveCore      bool
	CaptiveCoreBinaryPath  string
	CaptiveCoreStoragePath string
	CaptiveCoreToml        *ledgerbackend.CaptiveCoreToml
	CaptiveCoreConfigUseDB bool
	RemoteCaptiveCoreURL   string
	NetworkPassphrase      string

	HistorySession     db.SessionInterface
	HistoryArchiveURLs []string

	DisableStateVerification     bool
	EnableReapLookupTables       bool
	EnableExtendedLogLedgerStats bool

	ReingestEnabled             bool
	MaxReingestRetries          int
	ReingestRetryBackoffSeconds int

	// The checkpoint frequency will be 64 unless you are using an exotic test setup.
	CheckpointFrequency uint32

	RoundingSlippageFilter int

	EnableIngestionFiltering bool
}

type ErrReingestRangeConflict

type ErrReingestRangeConflict struct {
	// contains filtered or unexported fields
}

ErrReingestRangeConflict indicates that the reingest range overlaps with horizon's most recently ingested ledger

func (ErrReingestRangeConflict) Error

func (e ErrReingestRangeConflict) Error() string

type Metrics

type Metrics struct {
	// MaxSupportedProtocolVersion exposes the maximum protocol version
	// supported by this version.
	MaxSupportedProtocolVersion prometheus.Gauge

	// LocalLedger exposes the last ingested ledger by this ingesting instance.
	LocalLatestLedger prometheus.Gauge

	// LedgerIngestionDuration exposes timing metrics about the rate and
	// duration of ledger ingestion (including updating DB and graph).
	LedgerIngestionDuration prometheus.Summary

	// LedgerIngestionTradeAggregationDuration exposes timing metrics about the rate and
	// duration of rebuilding trade aggregation buckets.
	LedgerIngestionTradeAggregationDuration prometheus.Summary

	// LedgerIngestionReapLookupTablesDuration exposes timing metrics about the rate and
	// duration of reaping lookup tables.
	LedgerIngestionReapLookupTablesDuration prometheus.Summary

	// StateVerifyDuration exposes timing metrics about the rate and
	// duration of state verification.
	StateVerifyDuration prometheus.Summary

	// StateInvalidGauge exposes state invalid metric. 1 if state is invalid,
	// 0 otherwise.
	StateInvalidGauge prometheus.GaugeFunc

	// StateVerifyLedgerEntriesCount exposes total number of ledger entries
	// checked by the state verifier by type.
	StateVerifyLedgerEntriesCount *prometheus.GaugeVec

	// LedgerStatsCounter exposes ledger stats counters (like number of ops/changes).
	LedgerStatsCounter *prometheus.CounterVec

	// ProcessorsRunDuration exposes processors run durations.
	// Deprecated in favor of: ProcessorsRunDurationSummary.
	ProcessorsRunDuration *prometheus.CounterVec

	// ProcessorsRunDurationSummary exposes processors run durations.
	ProcessorsRunDurationSummary *prometheus.SummaryVec

	// LedgerFetchDurationSummary exposes a summary of durations required to
	// fetch data from ledger backend.
	LedgerFetchDurationSummary prometheus.Summary

	// CaptiveStellarCoreSynced exposes synced status of Captive Stellar-Core.
	// 1 if sync, 0 if not synced, -1 if unable to connect or HTTP server disabled.
	CaptiveStellarCoreSynced prometheus.GaugeFunc

	// CaptiveCoreSupportedProtocolVersion exposes the maximum protocol version
	// supported by the running Captive-Core.
	CaptiveCoreSupportedProtocolVersion prometheus.GaugeFunc
}

type MockFilters

type MockFilters struct {
	mock.Mock
}

func (*MockFilters) GetFilters

type OrderBookStream

type OrderBookStream struct {

	// LatestLedgerGauge exposes the local (order book graph)
	// latest processed ledger
	LatestLedgerGauge prometheus.Gauge
	// contains filtered or unexported fields
}

OrderBookStream updates an in memory graph to be consistent with offers in the Horizon DB. Any offers which are created, modified, or removed from the Horizon DB during ingestion will be applied to the in memory order book graph. OrderBookStream assumes that no other component will update the in memory graph. However, it is safe for other go routines to use the in memory graph for read operations.

func NewOrderBookStream

func NewOrderBookStream(historyQ history.IngestionQ, graph orderbook.OBGraph) *OrderBookStream

NewOrderBookStream constructs and initializes an OrderBookStream instance

func (*OrderBookStream) Run

func (o *OrderBookStream) Run(ctx context.Context)

Run will call Update() every 30 seconds until the given context is terminated.

func (*OrderBookStream) Update

func (o *OrderBookStream) Update(ctx context.Context) error

Update will query the Horizon DB for offers which have been created, removed, or updated since the last time Update() was called. Those changes will then be applied to the in memory order book graph. After calling this function, the the in memory order book graph should be consistent with the Horizon DB (assuming no error is returned).

type ParallelSystems

type ParallelSystems struct {
	// contains filtered or unexported fields
}

func NewParallelSystems

func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error)

func (*ParallelSystems) ReingestRange

func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, batchSizeSuggestion uint32) error

func (*ParallelSystems) Shutdown

func (ps *ParallelSystems) Shutdown()

type ProcessorRunner

type ProcessorRunner struct {
	// contains filtered or unexported fields
}

func (*ProcessorRunner) DisableMemoryStatsLogging

func (s *ProcessorRunner) DisableMemoryStatsLogging()

func (*ProcessorRunner) EnableMemoryStatsLogging

func (s *ProcessorRunner) EnableMemoryStatsLogging()

func (*ProcessorRunner) RunAllProcessorsOnLedger

func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
	stats ledgerStats,
	err error,
)

func (*ProcessorRunner) RunGenesisStateIngestion

func (s *ProcessorRunner) RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error)

func (*ProcessorRunner) RunHistoryArchiveIngestion

func (s *ProcessorRunner) RunHistoryArchiveIngestion(
	checkpointLedger uint32,
	skipChecks bool,
	ledgerProtocolVersion uint32,
	bucketListHash xdr.Hash,
) (ingest.StatsChangeProcessorResults, error)

func (*ProcessorRunner) RunTransactionProcessorsOnLedger

func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
	transactionStats processors.StatsLedgerTransactionProcessorResults,
	transactionDurations processorsRunDurations,
	tradeStats processors.TradeStats,
	err error,
)

func (*ProcessorRunner) SetHistoryAdapter

func (s *ProcessorRunner) SetHistoryAdapter(historyAdapter historyArchiveAdapterInterface)

type ProcessorRunnerInterface

type ProcessorRunnerInterface interface {
	SetHistoryAdapter(historyAdapter historyArchiveAdapterInterface)
	EnableMemoryStatsLogging()
	DisableMemoryStatsLogging()
	RunGenesisStateIngestion() (ingest.StatsChangeProcessorResults, error)
	RunHistoryArchiveIngestion(
		checkpointLedger uint32,
		skipChecks bool,
		ledgerProtocolVersion uint32,
		bucketListHash xdr.Hash,
	) (ingest.StatsChangeProcessorResults, error)
	RunTransactionProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
		transactionStats processors.StatsLedgerTransactionProcessorResults,
		transactionDurations processorsRunDurations,
		tradeStats processors.TradeStats,
		err error,
	)
	RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
		stats ledgerStats,
		err error,
	)
}

type System

type System interface {
	Run()
	RegisterMetrics(*prometheus.Registry)
	Metrics() Metrics
	StressTest(numTransactions, changesPerTransaction int) error
	VerifyRange(fromLedger, toLedger uint32, verifyState bool) error
	BuildState(sequence uint32, skipChecks bool) error
	ReingestRange(ledgerRanges []history.LedgerRange, force bool) error
	BuildGenesisState() error
	Shutdown()
}

func NewSystem

func NewSystem(config Config) (System, error)

Directories

Path Synopsis
Package verify provides helpers used for verifying if the ingested data is correct.
Package verify provides helpers used for verifying if the ingested data is correct.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL