kafka

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2024 License: Apache-2.0 Imports: 47 Imported by: 0

Documentation

Overview

Package kafka contains receives CockroachDB CDC changefeed events that are routed via a kafka cluster.

Index

Constants

This section is empty.

Variables

Set is used by Wire.

Functions

func ProvideConveyorConfig

func ProvideConveyorConfig(cfg *Config) *conveyor.Config

ProvideConveyorConfig is called by Wire.

Types

type Config

type Config struct {
	ConveyorConfig conveyor.Config
	DLQ            dlq.Config
	Script         script.Config
	Sequencer      sequencer.Config
	Staging        sinkprod.StagingConfig
	Target         sinkprod.TargetConfig
	TargetSchema   ident.Schema
	TLS            secure.Config

	BatchSize        int           // How many messages to accumulate before committing to the target
	Brokers          []string      // The address of the Kafka brokers
	Group            string        // the Kafka consumer group id.
	MaxTimestamp     string        // Only accept messages at or older than this timestamp
	MinTimestamp     string        // Only accept messages at or newer than this timestamp
	ResolvedInterval time.Duration // Minimal duration between resolved timestamps.
	SASL             SASLConfig    // SASL parameters
	Strategy         string        // Kafka consumer group re-balance strategy
	Topics           []string      // The list of topics that the consumer should use.
	// contains filtered or unexported fields
}

Config contains the configuration necessary for creating a replication connection. ServerID and SourceConn are mandatory.

func (*Config) Bind

func (c *Config) Bind(f *pflag.FlagSet)

Bind adds flags to the set. It delegates to the embedded Config.Bind.

func (*Config) Preflight

func (c *Config) Preflight(ctx context.Context) error

Preflight updates the configuration with sane defaults or returns an error if there are missing options for which a default cannot be provided.

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn encapsulates all wire-connection behavior. It is responsible for receiving replication messages and replying with status updates. TODO (silvano): support Avro format, schema registry. https://github.com/cockroachdb/cdc-sink/issues/776

func ProvideConn

func ProvideConn(ctx *stopper.Context, config *Config, conv *conveyor.Conveyors) (*Conn, error)

ProvideConn is called by Wire to construct this package's logical.Dialect implementation. There's a fake dependency on the script loader so that flags can be evaluated first.

func (*Conn) Start

func (c *Conn) Start(ctx *stopper.Context) (err error)

Start the replication loop. Connect to the Kafka cluster and process events from the given topics. If more that one processes is started, the partitions within the topics are allocated to each process based on the chosen rebalance strategy.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a Kafka consumer

func (*Consumer) Cleanup

func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(
	session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim,
) (err error)

ConsumeClaim processes new messages for the topic/partition specified in the claim.

func (*Consumer) Setup

func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

type Conveyor

type Conveyor interface {
	// AcceptMultiBatch processes a batch. The batch is committed to the target
	// database or to a staging area, depending on the mode in which
	// the connector is running.
	AcceptMultiBatch(context.Context, *types.MultiBatch, *types.AcceptOptions) error
	// Advance extends the proposed checkpoint timestamp associated with a partition.
	// It is called when a resolved timestamp is received by the consumer.
	Advance(context.Context, ident.Ident, hlc.Time) error
	// Ensure that a checkpoint exists for all the given partitions. It should be
	// called every time a new partition or topic is discovered by the consumer group.
	Ensure(context.Context, []ident.Ident) error
	// Access to the underlying schema.
	Watcher() types.Watcher
}

Conveyor exposes the methods used by the kafka connector to deliver mutations, in batches, to the destination. It controls the checkpoint timestamp associated to each partition. We also implement a mock conveyor for testing purposes.

type EagerConfig

type EagerConfig Config

EagerConfig is a hack to get Wire to move userscript evaluation to the beginning of the injector. This allows CLI flags to be set by the script.

func ProvideEagerConfig

func ProvideEagerConfig(cfg *Config, _ *script.Loader) *EagerConfig

ProvideEagerConfig is a hack to move up the evaluation of the user script so that the options callbacks can set any non-script-related CLI flags.

type Kafka

type Kafka struct {
	Conn        *Conn
	Diagnostics *diag.Diagnostics
}

Kafka is a kafka logical replication loop.

func Start

func Start(ctx *stopper.Context, config *Config) (*Kafka, error)

Start creates a Kafka logical replication loop using the provided configuration.

func (*Kafka) GetDiagnostics

func (k *Kafka) GetDiagnostics() *diag.Diagnostics

GetDiagnostics implements stdlogical.HasDiagnostics.

type OffsetSeeker

type OffsetSeeker interface {
	// GetOffsets finds the most recent offsets for resolved timestamp messages
	// that are before the given time, and in the given topics.
	GetOffsets([]string, hlc.Time) ([]*partitionState, error)
	// Close shuts down the connection with the Kafka broker.
	Close() error
}

OffsetSeeker finds offsets within Kafka topics.

func NewOffsetSeeker

func NewOffsetSeeker(config *Config) (OffsetSeeker, error)

NewOffsetSeeker instantiates an offsetManager.

type SASLConfig

type SASLConfig struct {
	ClientID     string
	ClientSecret string
	GrantType    string
	Mechanism    string
	Password     string
	Scopes       []string
	TokenURL     string
	User         string
}

SASLConfig defines the SASL parameters.

Directories

Path Synopsis
Package mocks implements a simple Kafka client and consumer for testing purposes.
Package mocks implements a simple Kafka client and consumer for testing purposes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL