Versions in this module Expand all Collapse all v1 v1.4.3 Apr 6, 2022 Changes in this version + var ErrAborting = errors.New("client is aborting buffered records") + var ErrClientClosed = errors.New("client closed") + var ErrMaxBuffered = errors.New("the maximum amount of records are buffered, cannot buffer more") + var ErrRecordRetries = errors.New("record failed after being retried too many times") + var ErrRecordTimeout = errors.New("records have timed out before they were able to be produced") + func ParseConsumerSyncAssignment(assignment []byte) (map[string][]int32, error) + type Acks struct + func AllISRAcks() Acks + func LeaderAck() Acks + func NoAck() Acks + type BalancePlan struct + func (p *BalancePlan) AddPartition(member *kmsg.JoinGroupResponseMember, topic string, partition int32) + func (p *BalancePlan) AddPartitions(member *kmsg.JoinGroupResponseMember, topic string, partitions []int32) + func (p *BalancePlan) AdjustCooperative(b *ConsumerBalancer) + func (p *BalancePlan) IntoSyncAssignment() []kmsg.SyncGroupRequestGroupAssignment + func (p *BalancePlan) String() string + type Broker struct + func (b *Broker) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) + func (b *Broker) RetriableRequest(ctx context.Context, req kmsg.Request) (kmsg.Response, error) + type BrokerE2E struct + BytesRead int + BytesWritten int + ReadErr error + ReadWait time.Duration + TimeToRead time.Duration + TimeToWrite time.Duration + WriteErr error + WriteWait time.Duration + func (e *BrokerE2E) DurationE2E() time.Duration + func (e *BrokerE2E) Err() error + type BrokerMetadata struct + Host string + NodeID int32 + Port int32 + Rack *string + type Client struct + func NewClient(opts ...Opt) (*Client, error) + func (cl *Client) AbortBufferedRecords(ctx context.Context) error + func (cl *Client) AddConsumeTopics(topics ...string) + func (cl *Client) AllowRebalance() + func (cl *Client) BeginTransaction() error + func (cl *Client) Broker(id int) *Broker + func (cl *Client) BufferedFetchRecords() int64 + func (cl *Client) BufferedProduceRecords() int64 + func (cl *Client) Close() + func (cl *Client) CommitOffsets(ctx context.Context, uncommitted map[string]map[int32]EpochOffset, ...) + func (cl *Client) CommitOffsetsSync(ctx context.Context, uncommitted map[string]map[int32]EpochOffset, ...) + func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error + func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error + func (cl *Client) CommittedOffsets() map[string]map[int32]EpochOffset + func (cl *Client) DiscoveredBrokers() []*Broker + func (cl *Client) EndAndBeginTransaction(ctx context.Context, how EndBeginTxnHow, commit TransactionEndTry, ...) (rerr error) + func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) error + func (cl *Client) Flush(ctx context.Context) error + func (cl *Client) ForceMetadataRefresh() + func (cl *Client) ForceRebalance() + func (cl *Client) LeaveGroup() + func (cl *Client) MarkCommitRecords(rs ...*Record) + func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[string][]int32 + func (cl *Client) PauseFetchTopics(topics ...string) []string + func (cl *Client) Ping(ctx context.Context) error + func (cl *Client) PollFetches(ctx context.Context) Fetches + func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches + func (cl *Client) Produce(ctx context.Context, r *Record, promise func(*Record, error)) + func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults + func (cl *Client) PurgeTopicsFromClient(topics ...string) + func (cl *Client) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) + func (cl *Client) RequestSharded(ctx context.Context, req kmsg.Request) []ResponseShard + func (cl *Client) ResumeFetchPartitions(topicPartitions map[string][]int32) + func (cl *Client) ResumeFetchTopics(topics ...string) + func (cl *Client) SeedBrokers() []*Broker + func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) + func (cl *Client) TryProduce(ctx context.Context, r *Record, promise func(*Record, error)) + func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset + type CompressionCodec struct + func GzipCompression() CompressionCodec + func Lz4Compression() CompressionCodec + func NoCompression() CompressionCodec + func SnappyCompression() CompressionCodec + func ZstdCompression() CompressionCodec + func (c CompressionCodec) WithLevel(level int) CompressionCodec + type ConsumerBalancer struct + func NewConsumerBalancer(balance ConsumerBalancerBalance, members []kmsg.JoinGroupResponseMember) (*ConsumerBalancer, error) + func (b *ConsumerBalancer) Balance(topics map[string]int32) IntoSyncAssignment + func (b *ConsumerBalancer) EachMember(...) + func (b *ConsumerBalancer) MemberAt(n int) (*kmsg.JoinGroupResponseMember, *kmsg.ConsumerMemberMetadata) + func (b *ConsumerBalancer) MemberTopics() map[string]struct{} + func (b *ConsumerBalancer) Members() []kmsg.JoinGroupResponseMember + func (b *ConsumerBalancer) NewPlan() *BalancePlan + type ConsumerBalancerBalance interface + Balance func(*ConsumerBalancer, map[string]int32) IntoSyncAssignment + type ConsumerOpt interface + func ConsumePartitions(partitions map[string]map[int32]Offset) ConsumerOpt + func ConsumeRegex() ConsumerOpt + func ConsumeResetOffset(offset Offset) ConsumerOpt + func ConsumeTopics(topics ...string) ConsumerOpt + func DisableFetchSessions() ConsumerOpt + func FetchIsolationLevel(level IsolationLevel) ConsumerOpt + func FetchMaxBytes(b int32) ConsumerOpt + func FetchMaxPartitionBytes(b int32) ConsumerOpt + func FetchMaxWait(wait time.Duration) ConsumerOpt + func FetchMinBytes(b int32) ConsumerOpt + func KeepControlRecords() ConsumerOpt + func MaxConcurrentFetches(n int) ConsumerOpt + func Rack(rack string) ConsumerOpt + type EndBeginTxnHow uint8 + const EndBeginTxnSafe + const EndBeginTxnUnsafe + type EpochOffset struct + Epoch int32 + Offset int64 + type ErrDataLoss struct + ConsumedTo int64 + Partition int32 + ResetTo int64 + Topic string + func (e *ErrDataLoss) Error() string + type Fetch struct + Topics []FetchTopic + type FetchBatchMetrics struct + CompressedBytes int + CompressionType uint8 + NumRecords int + UncompressedBytes int + type FetchError struct + Err error + Partition int32 + Topic string + type FetchPartition struct + Err error + HighWatermark int64 + LastStableOffset int64 + LogStartOffset int64 + Partition int32 + Records []*Record + func (p *FetchPartition) EachRecord(fn func(*Record)) + type FetchTopic struct + Partitions []FetchPartition + Topic string + func (t *FetchTopic) EachPartition(fn func(FetchPartition)) + func (t *FetchTopic) EachRecord(fn func(*Record)) + func (t *FetchTopic) Records() []*Record + type FetchTopicPartition struct + Topic string + func (r *FetchTopicPartition) EachRecord(fn func(*Record)) + type Fetches []Fetch + func (fs Fetches) EachError(fn func(string, int32, error)) + func (fs Fetches) EachPartition(fn func(FetchTopicPartition)) + func (fs Fetches) EachRecord(fn func(*Record)) + func (fs Fetches) EachTopic(fn func(FetchTopic)) + func (fs Fetches) Errors() []FetchError + func (fs Fetches) IsClientClosed() bool + func (fs Fetches) RecordIter() *FetchesRecordIter + func (fs Fetches) Records() []*Record + type FetchesRecordIter struct + func (i *FetchesRecordIter) Done() bool + func (i *FetchesRecordIter) Next() *Record + type FirstErrPromise struct + func AbortingFirstErrPromise(cl *Client) *FirstErrPromise + func (f *FirstErrPromise) Err() error + func (f *FirstErrPromise) Promise() func(*Record, error) + type GroupBalancer interface + IsCooperative func() bool + JoinGroupMetadata func(topicInterests []string, currentAssignment map[string][]int32, ...) []byte + MemberBalancer func(members []kmsg.JoinGroupResponseMember) (b GroupMemberBalancer, topics map[string]struct{}, err error) + ParseSyncAssignment func(assignment []byte) (map[string][]int32, error) + ProtocolName func() string + func CooperativeStickyBalancer() GroupBalancer + func RangeBalancer() GroupBalancer + func RoundRobinBalancer() GroupBalancer + func StickyBalancer() GroupBalancer + type GroupMemberBalancer interface + Balance func(topics map[string]int32) IntoSyncAssignment + type GroupOpt interface + func AdjustFetchOffsetsFn(...) GroupOpt + func AutoCommitCallback(fn func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error)) GroupOpt + func AutoCommitInterval(interval time.Duration) GroupOpt + func AutoCommitMarks() GroupOpt + func Balancers(balancers ...GroupBalancer) GroupOpt + func BlockRebalanceOnPoll() GroupOpt + func ConsumerGroup(group string) GroupOpt + func DisableAutoCommit() GroupOpt + func GreedyAutoCommit() GroupOpt + func GroupProtocol(protocol string) GroupOpt + func HeartbeatInterval(interval time.Duration) GroupOpt + func InstanceID(id string) GroupOpt + func OnPartitionsAssigned(onAssigned func(context.Context, *Client, map[string][]int32)) GroupOpt + func OnPartitionsLost(onLost func(context.Context, *Client, map[string][]int32)) GroupOpt + func OnPartitionsRevoked(onRevoked func(context.Context, *Client, map[string][]int32)) GroupOpt + func RebalanceTimeout(timeout time.Duration) GroupOpt + func RequireStableFetchOffsets() GroupOpt + func SessionTimeout(timeout time.Duration) GroupOpt + type GroupTransactSession struct + func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) + func (s *GroupTransactSession) Begin() error + func (s *GroupTransactSession) Client() *Client + func (s *GroupTransactSession) Close() + func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry) (committed bool, err error) + func (s *GroupTransactSession) PollFetches(ctx context.Context) Fetches + func (s *GroupTransactSession) PollRecords(ctx context.Context, maxPollRecords int) Fetches + func (s *GroupTransactSession) Produce(ctx context.Context, r *Record, promise func(*Record, error)) + func (s *GroupTransactSession) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults + func (s *GroupTransactSession) TryProduce(ctx context.Context, r *Record, promise func(*Record, error)) + type Hook interface + type HookBrokerConnect interface + OnBrokerConnect func(meta BrokerMetadata, dialDur time.Duration, conn net.Conn, err error) + type HookBrokerDisconnect interface + OnBrokerDisconnect func(meta BrokerMetadata, conn net.Conn) + type HookBrokerE2E interface + OnBrokerE2E func(meta BrokerMetadata, key int16, e2e BrokerE2E) + type HookBrokerRead interface + OnBrokerRead func(meta BrokerMetadata, key int16, bytesRead int, ...) + type HookBrokerThrottle interface + OnBrokerThrottle func(meta BrokerMetadata, throttleInterval time.Duration, ...) + type HookBrokerWrite interface + OnBrokerWrite func(meta BrokerMetadata, key int16, bytesWritten int, ...) + type HookFetchBatchRead interface + OnFetchBatchRead func(meta BrokerMetadata, topic string, partition int32, metrics FetchBatchMetrics) + type HookFetchRecordBuffered interface + OnFetchRecordBuffered func(*Record) + type HookFetchRecordUnbuffered interface + OnFetchRecordUnbuffered func(r *Record, polled bool) + type HookGroupManageError interface + OnGroupManageError func(error) + type HookNewClient interface + OnNewClient func(*Client) + type HookProduceBatchWritten interface + OnProduceBatchWritten func(meta BrokerMetadata, topic string, partition int32, ...) + type HookProduceRecordBuffered interface + OnProduceRecordBuffered func(*Record) + type HookProduceRecordUnbuffered interface + OnProduceRecordUnbuffered func(*Record, error) + type IntoSyncAssignment interface + IntoSyncAssignment func() []kmsg.SyncGroupRequestGroupAssignment + type IsolationLevel struct + func ReadCommitted() IsolationLevel + func ReadUncommitted() IsolationLevel + type LogLevel int8 + const LogLevelDebug + const LogLevelError + const LogLevelInfo + const LogLevelNone + const LogLevelWarn + func (l LogLevel) String() string + type Logger interface + Level func() LogLevel + Log func(level LogLevel, msg string, keyvals ...interface{}) + func BasicLogger(dst io.Writer, level LogLevel, prefixFn func() string) Logger + type Offset struct + func NewOffset() Offset + func NoResetOffset() Offset + func (o Offset) At(at int64) Offset + func (o Offset) AtEnd() Offset + func (o Offset) AtStart() Offset + func (o Offset) MarshalJSON() ([]byte, error) + func (o Offset) Relative(n int64) Offset + func (o Offset) String() string + func (o Offset) WithEpoch(e int32) Offset + type Opt interface + func AllowAutoTopicCreation() Opt + func BrokerMaxReadBytes(v int32) Opt + func BrokerMaxWriteBytes(v int32) Opt + func ClientID(id string) Opt + func ConcurrentTransactionsBackoff(backoff time.Duration) Opt + func ConnIdleTimeout(timeout time.Duration) Opt + func DialTLSConfig(c *tls.Config) Opt + func Dialer(fn func(ctx context.Context, network, host string) (net.Conn, error)) Opt + func MaxVersions(versions *kversion.Versions) Opt + func MetadataMaxAge(age time.Duration) Opt + func MetadataMinAge(age time.Duration) Opt + func MinVersions(versions *kversion.Versions) Opt + func RequestRetries(n int) Opt + func RequestTimeoutOverhead(overhead time.Duration) Opt + func RetryBackoffFn(backoff func(int) time.Duration) Opt + func RetryTimeout(t time.Duration) Opt + func RetryTimeoutFn(t func(int16) time.Duration) Opt + func SASL(sasls ...sasl.Mechanism) Opt + func SeedBrokers(seeds ...string) Opt + func SoftwareNameAndVersion(name, version string) Opt + func WithHooks(hooks ...Hook) Opt + func WithLogger(l Logger) Opt + type Partitioner interface + ForTopic func(string) TopicPartitioner + func BasicConsistentPartitioner(partition func(string) func(r *Record, n int) int) Partitioner + func LeastBackupPartitioner() Partitioner + func ManualPartitioner() Partitioner + func RoundRobinPartitioner() Partitioner + func StickyKeyPartitioner(overrideHasher PartitionerHasher) Partitioner + func StickyPartitioner() Partitioner + type PartitionerHasher func([]byte, int) int + func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher + func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher + type ProduceBatchMetrics struct + CompressedBytes int + CompressionType uint8 + NumRecords int + UncompressedBytes int + type ProduceResult struct + Err error + Record *Record + type ProduceResults []ProduceResult + func (rs ProduceResults) First() (*Record, error) + func (rs ProduceResults) FirstErr() error + type ProducerOpt interface + func DefaultProduceTopic(t string) ProducerOpt + func DisableIdempotentWrite() ProducerOpt + func ManualFlushing() ProducerOpt + func MaxBufferedRecords(n int) ProducerOpt + func MaxProduceRequestsInflightPerBroker(n int) ProducerOpt + func ProduceRequestTimeout(limit time.Duration) ProducerOpt + func ProducerBatchCompression(preference ...CompressionCodec) ProducerOpt + func ProducerBatchMaxBytes(v int32) ProducerOpt + func ProducerLinger(linger time.Duration) ProducerOpt + func ProducerOnDataLossDetected(fn func(string, int32)) ProducerOpt + func RecordDeliveryTimeout(timeout time.Duration) ProducerOpt + func RecordPartitioner(partitioner Partitioner) ProducerOpt + func RecordRetries(n int) ProducerOpt + func RequiredAcks(acks Acks) ProducerOpt + func StopProducerOnDataLossDetected() ProducerOpt + func TransactionTimeout(timeout time.Duration) ProducerOpt + func TransactionalID(id string) ProducerOpt + func UnknownTopicRetries(n int) ProducerOpt + type Record struct + Attrs RecordAttrs + Headers []RecordHeader + Key []byte + LeaderEpoch int32 + Offset int64 + Partition int32 + ProducerEpoch int16 + ProducerID int64 + Timestamp time.Time + Topic string + Value []byte + func KeySliceRecord(key, value []byte) *Record + func KeyStringRecord(key, value string) *Record + func SliceRecord(value []byte) *Record + func StringRecord(value string) *Record + func (r *Record) AppendFormat(b []byte, layout string) ([]byte, error) + type RecordAttrs struct + func (a RecordAttrs) CompressionType() uint8 + func (a RecordAttrs) IsControl() bool + func (a RecordAttrs) IsTransactional() bool + func (a RecordAttrs) TimestampType() int8 + type RecordFormatter struct + func NewRecordFormatter(layout string) (*RecordFormatter, error) + func (f *RecordFormatter) AppendPartitionRecord(b []byte, p *FetchPartition, r *Record) []byte + func (f *RecordFormatter) AppendRecord(b []byte, r *Record) []byte + type RecordHeader struct + Key string + Value []byte + type RecordReader struct + func NewRecordReader(reader io.Reader, layout string) (*RecordReader, error) + func (r *RecordReader) ReadRecord() (*Record, error) + func (r *RecordReader) ReadRecordInto(rec *Record) error + func (r *RecordReader) SetReader(reader io.Reader) + type ResponseShard struct + Err error + Meta BrokerMetadata + Req kmsg.Request + Resp kmsg.Response + type TopicBackupIter interface + Next func() (int, int64) + Rem func() int + type TopicBackupPartitioner interface + PartitionByBackup func(r *Record, n int, backupIter TopicBackupIter) int + type TopicPartitioner interface + Partition func(r *Record, n int) int + RequiresConsistency func(*Record) bool + type TopicPartitionerOnNewBatch interface + OnNewBatch func() + type TransactionEndTry bool + const TryAbort + const TryCommit