Documentation ¶
Index ¶
- Constants
- func NewKgoConfig(cfg *Config, logger *zap.Logger) ([]kgo.Opt, error)
- func SerializeJson(data interface{}) ([]byte, error)
- type Config
- type KgoZapLogger
- type PartitionMarks
- type SASLConfig
- type SASLGSSAPIConfig
- type Service
- func (s *Service) GetAverageMessageSize(ctx context.Context, topicName string) (int64, error)
- func (s *Service) GetMetadata(ctx context.Context) (*kmsg.MetadataResponse, error)
- func (s *Service) GetPartitionMarks(ctx context.Context, topic string, partitionIDs []int32) (map[int32]PartitionMarks, error)
- func (s *Service) GetTopicMessageCount(ctx context.Context, topicName string) (int64, error)
- func (s *Service) GetTopicMetadata(ctx context.Context, topicName string) (*kmsg.MetadataResponseTopic, error)
- func (s *Service) GetTopicSize(ctx context.Context, topicName string, partitionIDs []int32) (TopicLogDirSize, error)
- func (s *Service) ListOffsets(ctx context.Context, topicPartitions map[string][]int32, timestamp int64) (TopicPartitionOffsets, error)
- func (s *Service) NewKafkaClient() (*kgo.Client, error)
- type TLSConfig
- type TopicLogDirSize
- type TopicPartitionOffsets
Constants ¶
const ( SASLMechanismPlain = "PLAIN" SASLMechanismScramSHA256 = "SCRAM-SHA-256" SASLMechanismScramSHA512 = "SCRAM-SHA-512" SASLMechanismGSSAPI = "GSSAPI" SASLMechanismOAuthBearer = "OAUTHBEARER" )
const ( TimestampLatest = -1 TimestampEarliest = -2 )
Variables ¶
This section is empty.
Functions ¶
func NewKgoConfig ¶
NewKgoConfig creates a new Config for the Kafka Client as exposed by the franz-go library. If TLS certificates can't be read an error will be returned.
func SerializeJson ¶
Types ¶
type Config ¶
type Config struct { // General Brokers []string `yaml:"brokers"` ClientID string `yaml:"clientId"` TLS TLSConfig `yaml:"tls"` SASL SASLConfig `yaml:"sasl"` TopicReplicationFactor int16 `yaml:"topicReplicationFactor"` }
func (*Config) RegisterFlags ¶
RegisterFlags for all sensitive Kafka SASL configs.
type KgoZapLogger ¶
type KgoZapLogger struct {
// contains filtered or unexported fields
}
func (KgoZapLogger) Level ¶
func (k KgoZapLogger) Level() kgo.LogLevel
Level Implements kgo.Logger interface. It returns the log level to log at. We pin this to debug as the zap logger decides what to actually send to the output stream.
type PartitionMarks ¶
PartitionMarks is a partitionID along with it's highest and lowest message index
type SASLConfig ¶
type SASLConfig struct { Enabled bool `yaml:"enabled"` Username string `yaml:"username"` Password string `yaml:"password"` Mechanism string `yaml:"mechanism"` GSSAPIConfig SASLGSSAPIConfig `yaml:"gssapi"` }
SASLConfig for Kafka client
func (*SASLConfig) RegisterFlags ¶
func (c *SASLConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags for all sensitive Kafka SASL configs.
type SASLGSSAPIConfig ¶
type SASLGSSAPIConfig struct { AuthType string `yaml:"authType"` KeyTabPath string `yaml:"keyTabPath"` KerberosConfigPath string `yaml:"kerberosConfigPath"` ServiceName string `yaml:"serviceName"` Username string `yaml:"username"` Password string `yaml:"password"` Realm string `yaml:"realm"` }
SASLGSSAPIConfig represents the Kafka Kerberos config
func (*SASLGSSAPIConfig) RegisterFlags ¶
func (c *SASLGSSAPIConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags registers all sensitive Kerberos settings as flag
type Service ¶
Service acts as interface to interact with the Kafka Cluster
func NewService ¶
NewService creates a new Kafka service and immediately checks connectivity to all components. If any of these external dependencies fail an error wil be returned.
func (*Service) GetAverageMessageSize ¶
GetAverageMessageSize returns the average message size in a given topic in bytes.
func (*Service) GetMetadata ¶
func (*Service) GetPartitionMarks ¶
func (s *Service) GetPartitionMarks(ctx context.Context, topic string, partitionIDs []int32) (map[int32]PartitionMarks, error)
GetPartitionMarks returns a map of: partitionID -> PartitionMarks
func (*Service) GetTopicMessageCount ¶
TopicMessage count tries to return the number of kafka messages in that topic. Depending on the configuration this might be as simple as returning the delta between low and high watermark (delete cleanup policy), but is more complex for compacted topics. For compacted topics the number of messages will be estimated by dividing the log dir size with the average message size.
func (*Service) GetTopicMetadata ¶
func (*Service) GetTopicSize ¶
func (s *Service) GetTopicSize(ctx context.Context, topicName string, partitionIDs []int32) (TopicLogDirSize, error)
GetTopicSize returns the topic's log dir size in bytes.
func (*Service) ListOffsets ¶
func (s *Service) ListOffsets(ctx context.Context, topicPartitions map[string][]int32, timestamp int64) (TopicPartitionOffsets, error)
ListOffsets returns a nested map of: topic -> partitionID -> high water mark offset of all available partitions
type TLSConfig ¶
type TLSConfig struct { Enabled bool `yaml:"enabled"` CaFilepath string `yaml:"caFilepath"` CertFilepath string `yaml:"certFilepath"` KeyFilepath string `yaml:"keyFilepath"` Passphrase string `yaml:"passphrase"` InsecureSkipTLSVerify bool `yaml:"insecureSkipTlsVerify"` }
TLSConfig to connect to Kafka via TLS
func (*TLSConfig) RegisterFlags ¶
RegisterFlags for all sensitive Kafka TLS configs
type TopicLogDirSize ¶
type TopicLogDirSize struct { // TotalSize is the sum of primary log dir size and replica log dir size. TotalSize int64 // LeaderLogDirSize describes the total size of all leaders' replica log dirs. LeaderLogDirSize int64 // ReplicaLogDirSize describes the total size of all replica log dirs. ReplicaLogDirSize int64 }
type TopicPartitionOffsets ¶
TopicPartitionOffset is a map of Topicnames -> PartitionIDs -> Offset