ingest

package
v0.0.0-...-4b7be03 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2020 License: Apache-2.0 Imports: 0 Imported by: 0

Documentation

Overview

Package ingest provides primitives for building custom ingestion engines.

Very often developers need features that are outside of Millennium's scope. While it provides APIs for building the most common apps, it's not possible to add all possible features. This is why this package was created.

Ledger Backend

Ledger backends are sources of information about AiBlocks network ledgers. This can be either AiBlocks-Core DB, Captive AiBlocks-Core or History Archives. Please consult the ledgerbackend package docs for more information about each backend.

Warning: Please note that ledger backends provide low-level xdr.LedgerCloseMeta that should not be used directly unless the developer really understands this data structure. Read on to understand how to use ledger backend in higher level objects.

Readers

Readers are objects that wrap ledger backend and provide higher level, developer friendly APIs for reading ledger data.

Currently there are three types of readers (all in ingest/io package):

  • SingleLedgerStateReader reads ledger entries from history buckets for a given checkpoint ledger. Allow building state (all accounts, trust lines etc.) at any checkpoint ledger.
  • LedgerTransactionReader reads transactions for a given ledger sequence.
  • LedgerChangeReader reads all changes to ledger entries created as a result of transactions (fees and meta) and protocol upgrades in a given ledger.

Warning: Please note that readers stream both successful and failed transactions. Please check transactions status in your application (if required).

Processors

Processors allow building pipelines for ledger processing. This allows better separation of concerns in ingestion engines: some processors can be responsible for trading operations, other for payments, etc. This also allows easier configurations of pipelines. Some features can be turned off by simply disabling a single processor.

There are two types of processors (ingest/io package):

  • ChangeProcessor responsible for processing changes to ledger entries (io.Change).
  • TransactionProcessor reponsible for processing transactions (io.LedgerTransaction).

For an object to be a processor, it needs to implement a single method: ProcessChange or ProcessTransaction. This is a very simple yet powerful interface that allows building many kinds of features.

Example (Changes)

Example_changes demonstrates how to stream ledger entry changes for a specific ledger using captive aiblocks-core. Please note that transaction meta IS available when using this backend.

archiveURL := "http://history.aiblocks.io/prd/core-live/core_live_001"
networkPassphrase := network.PublicNetworkPassphrase

// Requires AiBlocks-Core 13.2.0+
backend, err := ledgerbackend.NewCaptive(
	"/bin/aiblocks-core",
	"/opt/aiblocks-core.cfg",
	networkPassphrase,
	[]string{archiveURL},
)
if err != nil {
	panic(err)
}

sequence := uint32(3)

err = backend.PrepareRange(ledgerbackend.SingleLedgerRange(sequence))
if err != nil {
	panic(err)
}

changeReader, err := io.NewLedgerChangeReader(backend, networkPassphrase, sequence)
if err != nil {
	panic(err)
}

for {
	change, err := changeReader.Read()
	if err == io.EOF {
		break
	}
	if err != nil {
		panic(err)
	}

	var action string
	switch {
	case change.Pre == nil && change.Post != nil:
		action = "created"
	case change.Pre != nil && change.Post != nil:
		action = "updated"
	case change.Pre != nil && change.Post == nil:
		action = "removed"
	}

	switch change.Type {
	case xdr.LedgerEntryTypeAccount:
		var accountEntry xdr.AccountEntry
		if change.Pre != nil {
			accountEntry = change.Pre.Data.MustAccount()
		} else {
			accountEntry = change.Post.Data.MustAccount()
		}
		fmt.Println("account", accountEntry.AccountId.Address(), action)
	case xdr.LedgerEntryTypeData:
		fmt.Println("data", action)
	case xdr.LedgerEntryTypeTrustline:
		fmt.Println("trustline", action)
	case xdr.LedgerEntryTypeOffer:
		fmt.Println("offer", action)
	default:
		panic("Unknown type")
	}
}
Output:

Example (Ledgerentrieshistoryarchive)

Example_ledgerentrieshistoryarchive demonstrates how to stream all ledger entries live at specific checkpoint ledger from history archives.

archiveURL := "http://history.aiblocks.io/prd/core-live/core_live_001"

archive, err := historyarchive.Connect(
	archiveURL,
	historyarchive.ConnectOptions{Context: context.TODO()},
)
if err != nil {
	panic(err)
}

// Ledger must be a checkpoint ledger: (100031+1) mod 64 == 0.
reader, err := io.MakeSingleLedgerStateReader(context.TODO(), archive, 100031)
if err != nil {
	panic(err)
}

var accounts, data, trustlines, offers int
for {
	entry, err := reader.Read()
	if err == io.EOF {
		break
	}
	if err != nil {
		panic(err)
	}

	switch entry.Type {
	case xdr.LedgerEntryTypeAccount:
		accounts++
	case xdr.LedgerEntryTypeData:
		data++
	case xdr.LedgerEntryTypeTrustline:
		trustlines++
	case xdr.LedgerEntryTypeOffer:
		offers++
	default:
		panic("Unknown type")
	}
}

fmt.Println("accounts", accounts)
fmt.Println("data", data)
fmt.Println("trustlines", trustlines)
fmt.Println("offers", offers)
Output:

Example (Transactionshistoryarchive)

Example_transactionshistoryarchive demonstrates how to stream transactions for a specific ledger from history archives. Please note that transaction meta IS NOT available in history archives.

archiveURL := "http://history.aiblocks.io/prd/core-live/core_live_001"
networkPassphrase := network.PublicNetworkPassphrase

archive, err := historyarchive.Connect(
	archiveURL,
	historyarchive.ConnectOptions{Context: context.TODO()},
)
if err != nil {
	panic(err)
}

backend := ledgerbackend.NewHistoryArchiveBackendFromArchive(archive)
txReader, err := io.NewLedgerTransactionReader(backend, networkPassphrase, 30000000)
if err != nil {
	panic(err)
}

for {
	tx, err := txReader.Read()
	if err == io.EOF {
		break
	}
	if err != nil {
		panic(err)
	}

	fmt.Printf("%d: %x (%d ops)\n", tx.Index, tx.Result.TransactionHash, len(tx.Envelope.Operations()))
}
Output:

Directories

Path Synopsis
Package verify provides helpers used for verifying if the ingested data is correct.
Package verify provides helpers used for verifying if the ingested data is correct.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL