protocol

package
v0.0.0-...-9613083 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2020 License: MIT Imports: 7 Imported by: 52

Documentation

Index

Constants

View Source
const (
	ProduceKey                 = 0
	FetchKey                   = 1
	OffsetsKey                 = 2
	MetadataKey                = 3
	LeaderAndISRKey            = 4
	StopReplicaKey             = 5
	UpdateMetadataKey          = 6
	ControlledShutdownKey      = 7
	OffsetCommitKey            = 8
	OffsetFetchKey             = 9
	FindCoordinatorKey         = 10
	JoinGroupKey               = 11
	HeartbeatKey               = 12
	LeaveGroupKey              = 13
	SyncGroupKey               = 14
	DescribeGroupsKey          = 15
	ListGroupsKey              = 16
	SaslHandshakeKey           = 17
	APIVersionsKey             = 18
	CreateTopicsKey            = 19
	DeleteTopicsKey            = 20
	DeleteRecordsKey           = 21
	InitProducerIDKey          = 22
	OffsetForLeaderEpochKey    = 23
	AddPartitionsToTxnKey      = 24
	AddOffsetsToTxnKey         = 25
	EndTxnKey                  = 26
	WriteTxnMarkersKey         = 27
	TxnOffsetCommitKey         = 28
	DescribeAclsKey            = 29
	CreateAclsKey              = 30
	DeleteAclsKey              = 31
	DescribeConfigsKey         = 32
	AlterConfigsKey            = 33
	AlterReplicaLogDirsKey     = 34
	DescribeLogDirsKey         = 35
	SaslAuthenticateKey        = 36
	CreatePartitionsKey        = 37
	CreateDelegationTokenKey   = 38
	RenewDelegationTokenKey    = 39
	ExpireDelegationTokenKey   = 40
	DescribeDelegationTokenKey = 41
	DeleteGroupsKey            = 42
)

Protocol API keys. See: https://kafka.apache.org/protocol#protocol_api_keys

Variables

View Source
var (
	ErrUnknown                            = Error{/* contains filtered or unexported fields */}
	ErrNone                               = Error{/* contains filtered or unexported fields */}
	ErrOffsetOutOfRange                   = Error{/* contains filtered or unexported fields */}
	ErrCorruptMessage                     = Error{/* contains filtered or unexported fields */}
	ErrUnknownTopicOrPartition            = Error{/* contains filtered or unexported fields */}
	ErrInvalidFetchSize                   = Error{/* contains filtered or unexported fields */}
	ErrLeaderNotAvailable                 = Error{/* contains filtered or unexported fields */}
	ErrNotLeaderForPartition              = Error{/* contains filtered or unexported fields */}
	ErrRequestTimedOut                    = Error{/* contains filtered or unexported fields */}
	ErrBrokerNotAvailable                 = Error{/* contains filtered or unexported fields */}
	ErrReplicaNotAvailable                = Error{/* contains filtered or unexported fields */}
	ErrMessageTooLarge                    = Error{/* contains filtered or unexported fields */}
	ErrStaleControllerEpoch               = Error{/* contains filtered or unexported fields */}
	ErrOffsetMetadataTooLarge             = Error{/* contains filtered or unexported fields */}
	ErrNetworkException                   = Error{/* contains filtered or unexported fields */}
	ErrCoordinatorLoadInProgress          = Error{/* contains filtered or unexported fields */}
	ErrCoordinatorNotAvailable            = Error{/* contains filtered or unexported fields */}
	ErrNotCoordinator                     = Error{/* contains filtered or unexported fields */}
	ErrInvalidTopicException              = Error{/* contains filtered or unexported fields */}
	ErrRecordListTooLarge                 = Error{/* contains filtered or unexported fields */}
	ErrNotEnoughReplicas                  = Error{/* contains filtered or unexported fields */}
	ErrNotEnoughReplicasAfterAppend       = Error{/* contains filtered or unexported fields */}
	ErrInvalidRequiredAcks                = Error{/* contains filtered or unexported fields */}
	ErrIllegalGeneration                  = Error{/* contains filtered or unexported fields */}
	ErrInconsistentGroupProtocol          = Error{/* contains filtered or unexported fields */}
	ErrInvalidGroupId                     = Error{/* contains filtered or unexported fields */}
	ErrUnknownMemberId                    = Error{/* contains filtered or unexported fields */}
	ErrInvalidSessionTimeout              = Error{/* contains filtered or unexported fields */}
	ErrRebalanceInProgress                = Error{/* contains filtered or unexported fields */}
	ErrInvalidCommitOffsetSize            = Error{/* contains filtered or unexported fields */}
	ErrTopicAuthorizationFailed           = Error{/* contains filtered or unexported fields */}
	ErrGroupAuthorizationFailed           = Error{/* contains filtered or unexported fields */}
	ErrClusterAuthorizationFailed         = Error{/* contains filtered or unexported fields */}
	ErrInvalidTimestamp                   = Error{/* contains filtered or unexported fields */}
	ErrUnsupportedSaslMechanism           = Error{/* contains filtered or unexported fields */}
	ErrIllegalSaslState                   = Error{/* contains filtered or unexported fields */}
	ErrUnsupportedVersion                 = Error{/* contains filtered or unexported fields */}
	ErrTopicAlreadyExists                 = Error{/* contains filtered or unexported fields */}
	ErrInvalidPartitions                  = Error{/* contains filtered or unexported fields */}
	ErrInvalidReplicationFactor           = Error{/* contains filtered or unexported fields */}
	ErrInvalidReplicaAssignment           = Error{/* contains filtered or unexported fields */}
	ErrInvalidConfig                      = Error{/* contains filtered or unexported fields */}
	ErrNotController                      = Error{/* contains filtered or unexported fields */}
	ErrInvalidRequest                     = Error{/* contains filtered or unexported fields */}
	ErrUnsupportedForMessageFormat        = Error{/* contains filtered or unexported fields */}
	ErrPolicyViolation                    = Error{/* contains filtered or unexported fields */}
	ErrOutOfOrderSequenceNumber           = Error{/* contains filtered or unexported fields */}
	ErrDuplicateSequenceNumber            = Error{/* contains filtered or unexported fields */}
	ErrInvalidProducerEpoch               = Error{/* contains filtered or unexported fields */}
	ErrInvalidTxnState                    = Error{/* contains filtered or unexported fields */}
	ErrInvalidProducerIdMapping           = Error{/* contains filtered or unexported fields */}
	ErrInvalidTransactionTimeout          = Error{/* contains filtered or unexported fields */}
	ErrConcurrentTransactions             = Error{/* contains filtered or unexported fields */}
	ErrTransactionCoordinatorFenced       = Error{/* contains filtered or unexported fields */}
	ErrTransactionalIdAuthorizationFailed = Error{/* contains filtered or unexported fields */}
	ErrSecurityDisabled                   = Error{/* contains filtered or unexported fields */}
	ErrOperationNotAttempted              = Error{/* contains filtered or unexported fields */}

	// Errs maps err codes to their errs.
	Errs = map[int16]Error{
		-1: ErrUnknown,
		0:  ErrNone,
		1:  ErrOffsetOutOfRange,
		2:  ErrCorruptMessage,
		3:  ErrUnknownTopicOrPartition,
		4:  ErrInvalidFetchSize,
		5:  ErrLeaderNotAvailable,
		6:  ErrNotLeaderForPartition,
		7:  ErrRequestTimedOut,
		8:  ErrBrokerNotAvailable,
		9:  ErrReplicaNotAvailable,
		10: ErrMessageTooLarge,
		11: ErrStaleControllerEpoch,
		12: ErrOffsetMetadataTooLarge,
		13: ErrNetworkException,
		14: ErrCoordinatorLoadInProgress,
		15: ErrCoordinatorNotAvailable,
		16: ErrNotCoordinator,
		17: ErrInvalidTopicException,
		18: ErrRecordListTooLarge,
		19: ErrNotEnoughReplicas,
		20: ErrNotEnoughReplicasAfterAppend,
		21: ErrInvalidRequiredAcks,
		22: ErrIllegalGeneration,
		23: ErrInconsistentGroupProtocol,
		24: ErrInvalidGroupId,
		25: ErrUnknownMemberId,
		26: ErrInvalidSessionTimeout,
		27: ErrRebalanceInProgress,
		28: ErrInvalidCommitOffsetSize,
		29: ErrTopicAuthorizationFailed,
		30: ErrGroupAuthorizationFailed,
		31: ErrClusterAuthorizationFailed,
		32: ErrInvalidTimestamp,
		33: ErrUnsupportedSaslMechanism,
		34: ErrIllegalSaslState,
		35: ErrUnsupportedVersion,
		36: ErrTopicAlreadyExists,
		37: ErrInvalidPartitions,
		38: ErrInvalidReplicationFactor,
		39: ErrInvalidReplicaAssignment,
		40: ErrInvalidConfig,
		41: ErrNotController,
		42: ErrInvalidRequest,
		43: ErrUnsupportedForMessageFormat,
		44: ErrPolicyViolation,
		45: ErrOutOfOrderSequenceNumber,
		46: ErrDuplicateSequenceNumber,
		47: ErrInvalidProducerEpoch,
		48: ErrInvalidTxnState,
		49: ErrInvalidProducerIdMapping,
		50: ErrInvalidTransactionTimeout,
		51: ErrConcurrentTransactions,
		52: ErrTransactionCoordinatorFenced,
		53: ErrTransactionalIdAuthorizationFailed,
		54: ErrSecurityDisabled,
		55: ErrOperationNotAttempted,
	}
)
View Source
var APIVersions = []APIVersion{
	{APIKey: ProduceKey, MinVersion: 0, MaxVersion: 5},
	{APIKey: FetchKey, MinVersion: 0, MaxVersion: 3},
	{APIKey: OffsetsKey, MinVersion: 0, MaxVersion: 2},
	{APIKey: MetadataKey, MinVersion: 0, MaxVersion: 1},
	{APIKey: LeaderAndISRKey, MinVersion: 0, MaxVersion: 1},
	{APIKey: StopReplicaKey, MinVersion: 0, MaxVersion: 0},
	{APIKey: FindCoordinatorKey, MinVersion: 0, MaxVersion: 1},
	{APIKey: JoinGroupKey, MinVersion: 0, MaxVersion: 1},
	{APIKey: HeartbeatKey, MinVersion: 0, MaxVersion: 1},
	{APIKey: LeaveGroupKey, MinVersion: 0, MaxVersion: 1},
	{APIKey: SyncGroupKey, MinVersion: 0, MaxVersion: 1},
	{APIKey: DescribeGroupsKey, MinVersion: 0, MaxVersion: 1},
	{APIKey: ListGroupsKey, MinVersion: 0, MaxVersion: 1},
	{APIKey: APIVersionsKey, MinVersion: 0, MaxVersion: 1},
	{APIKey: CreateTopicsKey, MinVersion: 0, MaxVersion: 1},
	{APIKey: DeleteTopicsKey, MinVersion: 0, MaxVersion: 1},
}
View Source
var Encoding = binary.BigEndian
View Source
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")
View Source
var ErrInvalidArrayLength = errors.New("kafka: invalid array length")
View Source
var ErrInvalidByteSliceLength = errors.New("invalid byteslice length")
View Source
var ErrInvalidStringLength = errors.New("kafka: invalid string length")

Functions

func Decode

func Decode(b []byte, in VersionedDecoder, version int16) error

func Encode

func Encode(e Encoder) ([]byte, error)

func ExpectZeroSize

func ExpectZeroSize(sz int, err error) error

func MakeInt16

func MakeInt16(b []byte) int16

func MakeInt32

func MakeInt32(b []byte) int32

func MakeInt64

func MakeInt64(b []byte) int64

func Read

func Read(r io.Reader, data interface{}) error

func Size

func Size(v interface{}) int

func Write

func Write(w io.Writer, data interface{}) error

Types

type APIVersion

type APIVersion struct {
	APIKey     int16
	MinVersion int16
	MaxVersion int16
}

type APIVersionsRequest

type APIVersionsRequest struct {
	APIVersion int16
}

func (*APIVersionsRequest) Decode

func (c *APIVersionsRequest) Decode(_ PacketDecoder, version int16) error

func (*APIVersionsRequest) Encode

func (c *APIVersionsRequest) Encode(_ PacketEncoder) error

func (*APIVersionsRequest) Key

func (c *APIVersionsRequest) Key() int16

func (*APIVersionsRequest) Version

func (r *APIVersionsRequest) Version() int16

type APIVersionsResponse

type APIVersionsResponse struct {
	APIVersion int16

	ErrorCode    int16
	APIVersions  []APIVersion
	ThrottleTime time.Duration
}

func (*APIVersionsResponse) Decode

func (c *APIVersionsResponse) Decode(d PacketDecoder, version int16) error

func (*APIVersionsResponse) Encode

func (*APIVersionsResponse) Version

func (r *APIVersionsResponse) Version() int16

type AbortedTransaction

type AbortedTransaction struct {
	ProducerID  int64
	FirstOffset int64
}

func (*AbortedTransaction) Decode

func (t *AbortedTransaction) Decode(d PacketDecoder, version int16) (err error)

func (*AbortedTransaction) Encode

func (t *AbortedTransaction) Encode(e PacketEncoder) (err error)

type AlterConfigResourceResponse

type AlterConfigResourceResponse struct {
	ErrorCode    int16
	ErrorMessage *string
	Type         int8
	Name         string
}

type AlterConfigsEntry

type AlterConfigsEntry struct {
	Name  string
	Value *string
}

type AlterConfigsRequest

type AlterConfigsRequest struct {
	APIVersion int16

	Resources    []AlterConfigsResource
	ValidateOnly bool
}

func (*AlterConfigsRequest) Decode

func (r *AlterConfigsRequest) Decode(d PacketDecoder, version int16) (err error)

func (*AlterConfigsRequest) Encode

func (r *AlterConfigsRequest) Encode(e PacketEncoder) (err error)

func (*AlterConfigsRequest) Key

func (r *AlterConfigsRequest) Key() int16

func (*AlterConfigsRequest) Version

func (r *AlterConfigsRequest) Version() int16

type AlterConfigsResource

type AlterConfigsResource struct {
	Type    int8
	Name    string
	Entries []AlterConfigsEntry
}

type AlterConfigsResponse

type AlterConfigsResponse struct {
	APIVersion int16

	ThrottleTime time.Duration
	Resources    []AlterConfigResourceResponse
}

func (*AlterConfigsResponse) Decode

func (r *AlterConfigsResponse) Decode(d PacketDecoder, version int16) (err error)

func (*AlterConfigsResponse) Encode

func (*AlterConfigsResponse) Version

func (r *AlterConfigsResponse) Version() int16

type Body

type Body interface {
	Encoder
	Key() int16
	Version() int16
}

type Broker

type Broker struct {
	NodeID int32
	Host   string
	Port   int32
}

type Brokers

type Brokers []*Broker

type ByteDecoder

type ByteDecoder struct {
	// contains filtered or unexported fields
}

func NewDecoder

func NewDecoder(b []byte) *ByteDecoder

func (*ByteDecoder) ArrayLength

func (d *ByteDecoder) ArrayLength() (int, error)

func (*ByteDecoder) Bool

func (d *ByteDecoder) Bool() (bool, error)

func (*ByteDecoder) Bytes

func (d *ByteDecoder) Bytes() ([]byte, error)

func (*ByteDecoder) Int16

func (d *ByteDecoder) Int16() (int16, error)

func (*ByteDecoder) Int32

func (d *ByteDecoder) Int32() (int32, error)

func (*ByteDecoder) Int32Array

func (d *ByteDecoder) Int32Array() ([]int32, error)

func (*ByteDecoder) Int64

func (d *ByteDecoder) Int64() (int64, error)

func (*ByteDecoder) Int64Array

func (d *ByteDecoder) Int64Array() ([]int64, error)

func (*ByteDecoder) Int8

func (d *ByteDecoder) Int8() (int8, error)

func (*ByteDecoder) NullableString

func (d *ByteDecoder) NullableString() (*string, error)

func (*ByteDecoder) Offset

func (d *ByteDecoder) Offset() int

func (*ByteDecoder) Pop

func (d *ByteDecoder) Pop() error

func (*ByteDecoder) Push

func (d *ByteDecoder) Push(pd PushDecoder) error

func (*ByteDecoder) String

func (d *ByteDecoder) String() (string, error)

func (*ByteDecoder) StringArray

func (d *ByteDecoder) StringArray() ([]string, error)

type ByteEncoder

type ByteEncoder struct {
	// contains filtered or unexported fields
}

func NewByteEncoder

func NewByteEncoder(b []byte) *ByteEncoder

func (*ByteEncoder) Bytes

func (b *ByteEncoder) Bytes() []byte

func (*ByteEncoder) Pop

func (e *ByteEncoder) Pop()

func (*ByteEncoder) Push

func (e *ByteEncoder) Push(pe PushEncoder)

func (*ByteEncoder) PutArrayLength

func (e *ByteEncoder) PutArrayLength(in int) error

func (*ByteEncoder) PutBool

func (e *ByteEncoder) PutBool(in bool)

func (*ByteEncoder) PutBytes

func (e *ByteEncoder) PutBytes(in []byte) error

func (*ByteEncoder) PutInt16

func (e *ByteEncoder) PutInt16(in int16)

func (*ByteEncoder) PutInt32

func (e *ByteEncoder) PutInt32(in int32)

func (*ByteEncoder) PutInt32Array

func (e *ByteEncoder) PutInt32Array(in []int32) error

func (*ByteEncoder) PutInt64

func (e *ByteEncoder) PutInt64(in int64)

func (*ByteEncoder) PutInt64Array

func (e *ByteEncoder) PutInt64Array(in []int64) error

func (*ByteEncoder) PutInt8

func (e *ByteEncoder) PutInt8(in int8)

func (*ByteEncoder) PutNullableString

func (e *ByteEncoder) PutNullableString(in *string) error

func (*ByteEncoder) PutRawBytes

func (e *ByteEncoder) PutRawBytes(in []byte) error

func (*ByteEncoder) PutString

func (e *ByteEncoder) PutString(in string) error

func (*ByteEncoder) PutStringArray

func (e *ByteEncoder) PutStringArray(in []string) error

type CRCField

type CRCField struct {
	StartOffset int
}

func (*CRCField) Check

func (f *CRCField) Check(curOffset int, buf []byte) error

func (*CRCField) Fill

func (f *CRCField) Fill(curOffset int, buf []byte) error

func (*CRCField) ReserveSize

func (f *CRCField) ReserveSize() int

func (*CRCField) SaveOffset

func (f *CRCField) SaveOffset(in int)

type ControlledShutdownRequest

type ControlledShutdownRequest struct {
	APIVersion int16
}

func (*ControlledShutdownRequest) Decode

func (r *ControlledShutdownRequest) Decode(d PacketDecoder, version int16) (err error)

func (*ControlledShutdownRequest) Encode

func (r *ControlledShutdownRequest) Encode(e PacketEncoder) (err error)

func (*ControlledShutdownRequest) Key

func (ControlledShutdownRequest) Version

func (r ControlledShutdownRequest) Version() int16

type ControlledShutdownResponse

type ControlledShutdownResponse struct {
	APIVersion int16
}

func (*ControlledShutdownResponse) Decode

func (r *ControlledShutdownResponse) Decode(d PacketDecoder, version int16) (err error)

func (*ControlledShutdownResponse) Encode

func (r *ControlledShutdownResponse) Encode(e PacketEncoder) (err error)

func (*ControlledShutdownResponse) Version

func (r *ControlledShutdownResponse) Version() int16

type Coordinator

type Coordinator struct {
	NodeID int32
	Host   string
	Port   int32
}

type CoordinatorType

type CoordinatorType int8
const (
	CoordinatorGroup       CoordinatorType = 0
	CoordinatorTransaction CoordinatorType = 0
)

type CreateTopicRequest

type CreateTopicRequest struct {
	Topic             string
	NumPartitions     int32
	ReplicationFactor int16
	ReplicaAssignment map[int32][]int32
	Configs           map[string]*string
}

type CreateTopicRequests

type CreateTopicRequests struct {
	APIVersion int16

	Requests     []*CreateTopicRequest
	Timeout      time.Duration
	ValidateOnly bool
}

func (*CreateTopicRequests) Decode

func (r *CreateTopicRequests) Decode(d PacketDecoder, version int16) error

func (*CreateTopicRequests) Encode

func (r *CreateTopicRequests) Encode(e PacketEncoder) (err error)

func (*CreateTopicRequests) Key

func (r *CreateTopicRequests) Key() int16

func (*CreateTopicRequests) Version

func (r *CreateTopicRequests) Version() int16

type CreateTopicsResponse

type CreateTopicsResponse struct {
	APIVersion int16

	ThrottleTime    time.Duration
	TopicErrorCodes []*TopicErrorCode
}

func (*CreateTopicsResponse) Decode

func (c *CreateTopicsResponse) Decode(d PacketDecoder, version int16) error

func (*CreateTopicsResponse) Encode

func (c *CreateTopicsResponse) Encode(e PacketEncoder) (err error)

func (*CreateTopicsResponse) Version

func (r *CreateTopicsResponse) Version() int16

type Data

type Data struct {
	Partition int32
	RecordSet []byte
}

type Datas

type Datas []*Data

type Decoder

type Decoder interface {
	Decode(d PacketDecoder) error
}

type DeleteTopicsRequest

type DeleteTopicsRequest struct {
	APIVersion int16

	Topics  []string
	Timeout time.Duration
}

func (*DeleteTopicsRequest) Decode

func (r *DeleteTopicsRequest) Decode(d PacketDecoder, version int16) (err error)

func (*DeleteTopicsRequest) Encode

func (r *DeleteTopicsRequest) Encode(e PacketEncoder) (err error)

func (*DeleteTopicsRequest) Key

func (r *DeleteTopicsRequest) Key() int16

func (*DeleteTopicsRequest) Version

func (r *DeleteTopicsRequest) Version() int16

type DeleteTopicsResponse

type DeleteTopicsResponse struct {
	APIVersion int16

	ThrottleTime    time.Duration
	TopicErrorCodes []*TopicErrorCode
}

func (*DeleteTopicsResponse) Decode

func (c *DeleteTopicsResponse) Decode(d PacketDecoder, version int16) error

func (*DeleteTopicsResponse) Encode

func (*DeleteTopicsResponse) Version

func (r *DeleteTopicsResponse) Version() int16

type DescribeConfigsEntry

type DescribeConfigsEntry struct {
	Name        string
	Value       *string
	ReadOnly    bool
	IsDefault   bool
	IsSensitive bool
	Synonyms    []DescribeConfigsSynonym
}

type DescribeConfigsRequest

type DescribeConfigsRequest struct {
	APIVersion int16

	Resources       []DescribeConfigsResource
	IncludeSynonyms bool
}

func (*DescribeConfigsRequest) Decode

func (r *DescribeConfigsRequest) Decode(d PacketDecoder, version int16) (err error)

func (*DescribeConfigsRequest) Encode

func (r *DescribeConfigsRequest) Encode(e PacketEncoder) (err error)

func (*DescribeConfigsRequest) Key

func (r *DescribeConfigsRequest) Key() int16

func (*DescribeConfigsRequest) Version

func (r *DescribeConfigsRequest) Version() int16

type DescribeConfigsResource

type DescribeConfigsResource struct {
	Type        int8
	Name        string
	ConfigNames []string
}

type DescribeConfigsResourceResponse

type DescribeConfigsResourceResponse struct {
	ErrorCode     int16
	ErrorMessage  *string
	Type          int8
	Name          string
	ConfigEntries []DescribeConfigsEntry
}

type DescribeConfigsResponse

type DescribeConfigsResponse struct {
	APIVersion int16

	ThrottleTime time.Duration
	Resources    []DescribeConfigsResourceResponse
}

func (*DescribeConfigsResponse) Decode

func (r *DescribeConfigsResponse) Decode(d PacketDecoder, version int16) (err error)

func (*DescribeConfigsResponse) Encode

func (*DescribeConfigsResponse) Version

func (r *DescribeConfigsResponse) Version() int16

type DescribeConfigsSynonym

type DescribeConfigsSynonym struct {
	Name   string
	Value  *string
	Source int8
}

type DescribeGroupsRequest

type DescribeGroupsRequest struct {
	APIVersion int16

	GroupIDs []string
}

func (*DescribeGroupsRequest) Decode

func (r *DescribeGroupsRequest) Decode(d PacketDecoder, version int16) (err error)

func (*DescribeGroupsRequest) Encode

func (*DescribeGroupsRequest) Key

func (r *DescribeGroupsRequest) Key() int16

func (*DescribeGroupsRequest) Version

func (r *DescribeGroupsRequest) Version() int16

type DescribeGroupsResponse

type DescribeGroupsResponse struct {
	APIVersion int16

	ThrottleTime time.Duration
	Groups       []Group
}

func (*DescribeGroupsResponse) Decode

func (r *DescribeGroupsResponse) Decode(d PacketDecoder, version int16) (err error)

func (*DescribeGroupsResponse) Encode

func (*DescribeGroupsResponse) Key

func (r *DescribeGroupsResponse) Key() int16

type Encoder

type Encoder interface {
	Encode(e PacketEncoder) error
}

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error represents a protocol err. It makes it so the errors can have their error code and description too.

func (Error) Code

func (e Error) Code() int16

func (Error) Error

func (e Error) Error() string

func (Error) String

func (e Error) String() string

func (Error) WithErr

func (e Error) WithErr(err error) Error

type FetchPartition

type FetchPartition struct {
	Partition   int32
	FetchOffset int64
	MaxBytes    int32
}

type FetchPartitionResponse

type FetchPartitionResponse struct {
	Partition           int32
	ErrorCode           int16
	HighWatermark       int64
	LastStableOffset    int64
	AbortedTransactions []*AbortedTransaction
	RecordSet           []byte
}

func (*FetchPartitionResponse) Decode

func (r *FetchPartitionResponse) Decode(d PacketDecoder, version int16) (err error)

func (*FetchPartitionResponse) Encode

func (r *FetchPartitionResponse) Encode(e PacketEncoder, version int16) (err error)

type FetchPartitionResponses

type FetchPartitionResponses []*FetchPartitionResponse

type FetchPartitions

type FetchPartitions []*FetchPartition

type FetchRequest

type FetchRequest struct {
	APIVersion int16

	ReplicaID      int32
	MaxWaitTime    time.Duration
	MinBytes       int32
	MaxBytes       int32
	IsolationLevel IsolationLevel
	Topics         []*FetchTopic
}

func (*FetchRequest) Decode

func (r *FetchRequest) Decode(d PacketDecoder, version int16) (err error)

func (*FetchRequest) Encode

func (r *FetchRequest) Encode(e PacketEncoder) (err error)

func (*FetchRequest) Key

func (r *FetchRequest) Key() int16

func (*FetchRequest) Version

func (r *FetchRequest) Version() int16

type FetchResponse

type FetchResponse struct {
	APIVersion int16

	ThrottleTime time.Duration
	Responses    FetchTopicResponses
}

func (*FetchResponse) Decode

func (r *FetchResponse) Decode(d PacketDecoder, version int16) (err error)

func (*FetchResponse) Encode

func (r *FetchResponse) Encode(e PacketEncoder) (err error)

func (*FetchResponse) Version

func (r *FetchResponse) Version() int16

type FetchTopic

type FetchTopic struct {
	Topic      string
	Partitions []*FetchPartition
}

type FetchTopicResponse

type FetchTopicResponse struct {
	Topic              string
	PartitionResponses FetchPartitionResponses
}

type FetchTopicResponses

type FetchTopicResponses []*FetchTopicResponse

type FetchTopics

type FetchTopics []*FetchTopic

type FindCoordinatorRequest

type FindCoordinatorRequest struct {
	APIVersion int16

	CoordinatorKey  string
	CoordinatorType CoordinatorType
}

func (*FindCoordinatorRequest) Decode

func (r *FindCoordinatorRequest) Decode(d PacketDecoder, version int16) (err error)

func (*FindCoordinatorRequest) Encode

func (r *FindCoordinatorRequest) Encode(e PacketEncoder) (err error)

func (*FindCoordinatorRequest) Key

func (r *FindCoordinatorRequest) Key() int16

func (*FindCoordinatorRequest) Version

func (r *FindCoordinatorRequest) Version() int16

type FindCoordinatorResponse

type FindCoordinatorResponse struct {
	APIVersion int16

	ThrottleTime time.Duration
	ErrorCode    int16
	ErrorMessage *string
	Coordinator  Coordinator
}

func (*FindCoordinatorResponse) Decode

func (r *FindCoordinatorResponse) Decode(d PacketDecoder, version int16) (err error)

func (*FindCoordinatorResponse) Encode

func (r *FindCoordinatorResponse) Encode(e PacketEncoder) (err error)

func (*FindCoordinatorResponse) Version

func (r *FindCoordinatorResponse) Version() int16

type Group

type Group struct {
	ErrorCode    int16
	GroupID      string
	State        string
	ProtocolType string
	Protocol     string
	GroupMembers map[string]*GroupMember
}

func (*Group) Decode

func (r *Group) Decode(d PacketDecoder, version int16) (err error)

func (*Group) Encode

func (r *Group) Encode(e PacketEncoder) error

type GroupAssignment

type GroupAssignment struct {
	MemberID         string
	MemberAssignment []byte
}

type GroupMember

type GroupMember struct {
	ClientID              string
	ClientHost            string
	GroupMemberMetadata   []byte
	GroupMemberAssignment []byte
}

func (*GroupMember) Decode

func (r *GroupMember) Decode(d PacketDecoder, version int16) (err error)

func (*GroupMember) Encode

func (r *GroupMember) Encode(e PacketEncoder) error

type GroupProtocol

type GroupProtocol struct {
	ProtocolName     string
	ProtocolMetadata []byte
}

type HeartbeatRequest

type HeartbeatRequest struct {
	APIVersion int16

	GroupID           string
	GroupGenerationID int32
	MemberID          string
}

func (*HeartbeatRequest) Decode

func (r *HeartbeatRequest) Decode(d PacketDecoder, version int16) (err error)

func (*HeartbeatRequest) Encode

func (r *HeartbeatRequest) Encode(e PacketEncoder) (err error)

func (*HeartbeatRequest) Key

func (r *HeartbeatRequest) Key() int16

func (*HeartbeatRequest) Version

func (r *HeartbeatRequest) Version() int16

type HeartbeatResponse

type HeartbeatResponse struct {
	APIVersion int16

	ThrottleTime time.Duration
	ErrorCode    int16
}

func (*HeartbeatResponse) Decode

func (r *HeartbeatResponse) Decode(d PacketDecoder, version int16) (err error)

func (*HeartbeatResponse) Encode

func (r *HeartbeatResponse) Encode(e PacketEncoder) error

func (*HeartbeatResponse) Key

func (r *HeartbeatResponse) Key() int16

func (*HeartbeatResponse) Version

func (r *HeartbeatResponse) Version() int16

type Int32s

type Int32s []int32

type IsolationLevel

type IsolationLevel int8
const (
	ReadUncommitted IsolationLevel = 0
	ReadCommitted   IsolationLevel = 1
)

type JoinGroupRequest

type JoinGroupRequest struct {
	APIVersion int16

	GroupID          string
	SessionTimeout   int32
	RebalanceTimeout int32
	MemberID         string
	ProtocolType     string
	GroupProtocols   []*GroupProtocol
}

func (*JoinGroupRequest) Decode

func (r *JoinGroupRequest) Decode(d PacketDecoder, version int16) (err error)

func (*JoinGroupRequest) Encode

func (r *JoinGroupRequest) Encode(e PacketEncoder) (err error)

func (*JoinGroupRequest) Key

func (r *JoinGroupRequest) Key() int16

func (*JoinGroupRequest) Version

func (r *JoinGroupRequest) Version() int16

type JoinGroupResponse

type JoinGroupResponse struct {
	APIVersion int16

	ThrottleTime  time.Duration
	ErrorCode     int16
	GenerationID  int32
	GroupProtocol string
	LeaderID      string
	MemberID      string
	Members       []Member
}

func (*JoinGroupResponse) Decode

func (r *JoinGroupResponse) Decode(d PacketDecoder, version int16) (err error)

func (*JoinGroupResponse) Encode

func (r *JoinGroupResponse) Encode(e PacketEncoder) (err error)

func (*JoinGroupResponse) Key

func (r *JoinGroupResponse) Key() int16

func (*JoinGroupResponse) Version

func (r *JoinGroupResponse) Version() int16

type LeaderAndISRPartition

type LeaderAndISRPartition struct {
	Topic     string
	Partition int32
	ErrorCode int16
}

type LeaderAndISRPartitions

type LeaderAndISRPartitions []*LeaderAndISRPartition

type LeaderAndISRRequest

type LeaderAndISRRequest struct {
	APIVersion int16

	ControllerID    int32
	ControllerEpoch int32
	PartitionStates []*PartitionState
	LiveLeaders     []*LiveLeader
}

func (*LeaderAndISRRequest) Decode

func (r *LeaderAndISRRequest) Decode(d PacketDecoder, version int16) (err error)

func (*LeaderAndISRRequest) Encode

func (*LeaderAndISRRequest) Key

func (r *LeaderAndISRRequest) Key() int16

func (*LeaderAndISRRequest) Version

func (r *LeaderAndISRRequest) Version() int16

type LeaderAndISRResponse

type LeaderAndISRResponse struct {
	APIVersion int16

	ErrorCode  int16
	Partitions []*LeaderAndISRPartition
}

func (*LeaderAndISRResponse) Decode

func (r *LeaderAndISRResponse) Decode(d PacketDecoder, version int16) (err error)

func (*LeaderAndISRResponse) Encode

func (*LeaderAndISRResponse) Key

func (r *LeaderAndISRResponse) Key() int16

func (*LeaderAndISRResponse) Version

func (r *LeaderAndISRResponse) Version() int16

type LeaveGroupRequest

type LeaveGroupRequest struct {
	APIVersion int16

	GroupID  string
	MemberID string
}

func (*LeaveGroupRequest) Decode

func (r *LeaveGroupRequest) Decode(d PacketDecoder, version int16) (err error)

func (*LeaveGroupRequest) Encode

func (r *LeaveGroupRequest) Encode(e PacketEncoder) error

func (*LeaveGroupRequest) Key

func (r *LeaveGroupRequest) Key() int16

func (*LeaveGroupRequest) Version

func (r *LeaveGroupRequest) Version() int16

type LeaveGroupResponse

type LeaveGroupResponse struct {
	APIVersion int16

	ThrottleTime time.Duration
	ErrorCode    int16
}

func (*LeaveGroupResponse) Decode

func (r *LeaveGroupResponse) Decode(d PacketDecoder, version int16) (err error)

func (*LeaveGroupResponse) Encode

func (r *LeaveGroupResponse) Encode(e PacketEncoder) error

func (*LeaveGroupResponse) Key

func (r *LeaveGroupResponse) Key() int16

func (*LeaveGroupResponse) Version

func (r *LeaveGroupResponse) Version() int16

type LenEncoder

type LenEncoder struct {
	Length int
	// contains filtered or unexported fields
}

func (*LenEncoder) Pop

func (e *LenEncoder) Pop()

func (*LenEncoder) Push

func (e *LenEncoder) Push(pe PushEncoder)

func (*LenEncoder) PutArrayLength

func (e *LenEncoder) PutArrayLength(in int) error

func (*LenEncoder) PutBool

func (e *LenEncoder) PutBool(in bool)

func (*LenEncoder) PutBytes

func (e *LenEncoder) PutBytes(in []byte) error

func (*LenEncoder) PutInt16

func (e *LenEncoder) PutInt16(in int16)

func (*LenEncoder) PutInt32

func (e *LenEncoder) PutInt32(in int32)

func (*LenEncoder) PutInt32Array

func (e *LenEncoder) PutInt32Array(in []int32) error

func (*LenEncoder) PutInt64

func (e *LenEncoder) PutInt64(in int64)

func (*LenEncoder) PutInt64Array

func (e *LenEncoder) PutInt64Array(in []int64) error

func (*LenEncoder) PutInt8

func (e *LenEncoder) PutInt8(in int8)

func (*LenEncoder) PutNullableString

func (e *LenEncoder) PutNullableString(in *string) error

func (*LenEncoder) PutRawBytes

func (e *LenEncoder) PutRawBytes(in []byte) error

func (*LenEncoder) PutString

func (e *LenEncoder) PutString(in string) error

func (*LenEncoder) PutStringArray

func (e *LenEncoder) PutStringArray(in []string) error

type ListGroup

type ListGroup struct {
	GroupID      string
	ProtocolType string
}

type ListGroupsRequest

type ListGroupsRequest struct {
	APIVersion int16
}

func (*ListGroupsRequest) Decode

func (r *ListGroupsRequest) Decode(d PacketDecoder, version int16) (err error)

func (*ListGroupsRequest) Encode

func (r *ListGroupsRequest) Encode(e PacketEncoder) error

func (*ListGroupsRequest) Key

func (r *ListGroupsRequest) Key() int16

func (*ListGroupsRequest) Version

func (r *ListGroupsRequest) Version() int16

type ListGroupsResponse

type ListGroupsResponse struct {
	APIVersion int16

	ThrottleTime time.Duration
	ErrorCode    int16
	Groups       []ListGroup
}

func (*ListGroupsResponse) Decode

func (r *ListGroupsResponse) Decode(d PacketDecoder, version int16) (err error)

func (*ListGroupsResponse) Encode

func (r *ListGroupsResponse) Encode(e PacketEncoder) error

func (*ListGroupsResponse) Key

func (r *ListGroupsResponse) Key() int16

func (*ListGroupsResponse) Version

func (r *ListGroupsResponse) Version() int16

type LiveLeader

type LiveLeader struct {
	ID   int32
	Host string
	Port int32
}

type LiveLeaders

type LiveLeaders []*LiveLeader

type Member

type Member struct {
	MemberID       string
	MemberMetadata []byte
}

type Message

type Message struct {
	Crc        int32
	MagicByte  int8
	Attributes int8
	Timestamp  time.Time
	Key        []byte
	Value      []byte
}

func (*Message) Decode

func (m *Message) Decode(d PacketDecoder) error

func (*Message) Encode

func (m *Message) Encode(e PacketEncoder) error

type MessageSet

type MessageSet struct {
	Offset                  int64
	Size                    int32
	Messages                []*Message
	PartialTrailingMessages bool
}

func (*MessageSet) Decode

func (ms *MessageSet) Decode(d PacketDecoder) error

func (*MessageSet) Encode

func (ms *MessageSet) Encode(e PacketEncoder) error

type MetadataRequest

type MetadataRequest struct {
	APIVersion int16

	Topics                 []string
	AllowAutoTopicCreation bool
}

func (*MetadataRequest) Decode

func (r *MetadataRequest) Decode(d PacketDecoder, version int16) (err error)

func (*MetadataRequest) Encode

func (r *MetadataRequest) Encode(e PacketEncoder) (err error)

func (*MetadataRequest) Key

func (r *MetadataRequest) Key() int16

func (*MetadataRequest) Version

func (r *MetadataRequest) Version() int16

type MetadataResponse

type MetadataResponse struct {
	APIVersion int16

	Brokers       []*Broker
	ControllerID  int32
	TopicMetadata []*TopicMetadata
}

func (*MetadataResponse) Decode

func (r *MetadataResponse) Decode(d PacketDecoder, version int16) (err error)

func (*MetadataResponse) Encode

func (r *MetadataResponse) Encode(e PacketEncoder) (err error)

func (*MetadataResponse) Version

func (r *MetadataResponse) Version() int16

type OffsetCommitPartitionRequest

type OffsetCommitPartitionRequest struct {
	Partition int32
	Offset    int64
	Timestamp int64
	Metadata  *string
}

type OffsetCommitPartitionResponse

type OffsetCommitPartitionResponse struct {
	Partition int32
	ErrorCode int16
}

type OffsetCommitRequest

type OffsetCommitRequest struct {
	APIVersion int16

	GroupID       string
	GenerationID  int32
	MemberID      string
	RetentionTime int64
	Topics        []OffsetCommitTopicRequest
}

func (*OffsetCommitRequest) Decode

func (r *OffsetCommitRequest) Decode(d PacketDecoder, version int16) (err error)

func (*OffsetCommitRequest) Encode

func (r *OffsetCommitRequest) Encode(e PacketEncoder) (err error)

func (*OffsetCommitRequest) Key

func (r *OffsetCommitRequest) Key() int16

func (*OffsetCommitRequest) Version

func (r *OffsetCommitRequest) Version() int16

type OffsetCommitResponse

type OffsetCommitResponse struct {
	APIVersion int16

	ThrottleTime time.Duration
	Responses    []OffsetCommitTopicResponse
}

func (*OffsetCommitResponse) Decode

func (r *OffsetCommitResponse) Decode(d PacketDecoder, version int16) (err error)

func (*OffsetCommitResponse) Encode

func (r *OffsetCommitResponse) Encode(e PacketEncoder) (err error)

type OffsetCommitTopicRequest

type OffsetCommitTopicRequest struct {
	Topic      string
	Partitions []OffsetCommitPartitionRequest
}

type OffsetCommitTopicResponse

type OffsetCommitTopicResponse struct {
	Topic              string
	PartitionResponses []OffsetCommitPartitionResponse
}

type OffsetFetchPartition

type OffsetFetchPartition struct {
	Partition int32
	Offset    int16
	Metadata  *string
	ErrorCode int16
}

type OffsetFetchRequest

type OffsetFetchRequest struct {
	APIVersion int16

	GroupID string
	Topics  []OffsetFetchTopicRequest
}

func (*OffsetFetchRequest) Decode

func (r *OffsetFetchRequest) Decode(d PacketDecoder, version int16) (err error)

func (*OffsetFetchRequest) Encode

func (r *OffsetFetchRequest) Encode(e PacketEncoder) (err error)

func (*OffsetFetchRequest) Key

func (r *OffsetFetchRequest) Key() int16

func (*OffsetFetchRequest) Version

func (r *OffsetFetchRequest) Version() int16

type OffsetFetchResponse

type OffsetFetchResponse struct {
	APIVersion int16

	Responses []OffsetFetchTopicResponse
}

func (*OffsetFetchResponse) Decode

func (r *OffsetFetchResponse) Decode(d PacketDecoder, version int16) (err error)

func (*OffsetFetchResponse) Encode

func (r *OffsetFetchResponse) Encode(e PacketEncoder) (err error)

func (*OffsetFetchResponse) Version

func (r *OffsetFetchResponse) Version() int16

type OffsetFetchTopicRequest

type OffsetFetchTopicRequest struct {
	Topic      string
	Partitions []int32
}

type OffsetFetchTopicResponse

type OffsetFetchTopicResponse struct {
	Topic      string
	Partitions []OffsetFetchPartition
}

type OffsetResponse

type OffsetResponse struct {
	Topic              string
	PartitionResponses []*PartitionResponse
}

type OffsetsPartition

type OffsetsPartition struct {
	Partition     int32
	Timestamp     int64 // -1 to receive latest offset, -2 to receive earliest offset
	MaxNumOffsets int32
}

type OffsetsRequest

type OffsetsRequest struct {
	APIVersion int16

	ReplicaID      int32
	IsolationLevel int8
	Topics         []*OffsetsTopic
}

func (*OffsetsRequest) Decode

func (r *OffsetsRequest) Decode(d PacketDecoder, version int16) (err error)

func (*OffsetsRequest) Encode

func (r *OffsetsRequest) Encode(e PacketEncoder) (err error)

func (*OffsetsRequest) Key

func (r *OffsetsRequest) Key() int16

func (*OffsetsRequest) Version

func (r *OffsetsRequest) Version() int16

type OffsetsResponse

type OffsetsResponse struct {
	APIVersion int16

	ThrottleTime time.Duration
	Responses    []*OffsetResponse
}

func (*OffsetsResponse) Decode

func (r *OffsetsResponse) Decode(d PacketDecoder, version int16) (err error)

func (*OffsetsResponse) Encode

func (r *OffsetsResponse) Encode(e PacketEncoder) (err error)

func (*OffsetsResponse) Version

func (r *OffsetsResponse) Version() int16

type OffsetsTopic

type OffsetsTopic struct {
	Topic      string
	Partitions []*OffsetsPartition
}

type PacketDecoder

type PacketDecoder interface {
	Bool() (bool, error)
	Int8() (int8, error)
	Int16() (int16, error)
	Int32() (int32, error)
	Int64() (int64, error)
	ArrayLength() (int, error)
	Bytes() ([]byte, error)
	String() (string, error)
	NullableString() (*string, error)
	Int32Array() ([]int32, error)
	Int64Array() ([]int64, error)
	StringArray() ([]string, error)
	Push(pd PushDecoder) error
	Pop() error
	// contains filtered or unexported methods
}

type PacketEncoder

type PacketEncoder interface {
	PutBool(in bool)
	PutInt8(in int8)
	PutInt16(in int16)
	PutInt32(in int32)
	PutInt64(in int64)
	PutArrayLength(in int) error
	PutRawBytes(in []byte) error
	PutBytes(in []byte) error
	PutString(in string) error
	PutNullableString(in *string) error
	PutStringArray(in []string) error
	PutInt32Array(in []int32) error
	PutInt64Array(in []int64) error
	Push(pe PushEncoder)
	Pop()
}

type PartitionMetadata

type PartitionMetadata struct {
	PartitionErrorCode int16
	PartitionID        int32
	Leader             int32
	Replicas           []int32
	ISR                []int32
}

type PartitionMetadatas

type PartitionMetadatas []*PartitionMetadata

type PartitionResponse

type PartitionResponse struct {
	Partition int32
	ErrorCode int16
	Timestamp time.Time
	Offsets   []int64
	Offset    int64
}

type PartitionState

type PartitionState struct {
	Topic           string
	Partition       int32
	ControllerEpoch int32
	Leader          int32
	LeaderEpoch     int32
	ISR             []int32
	ZKVersion       int32
	Replicas        []int32
	IsNew           bool
}

type PartitionStates

type PartitionStates []*PartitionState

type ProducePartitionResponse

type ProducePartitionResponse struct {
	Partition      int32
	ErrorCode      int16
	BaseOffset     int64
	LogAppendTime  time.Time
	LogStartOffset int64
}

type ProducePartitionResponses

type ProducePartitionResponses []*ProducePartitionResponse

type ProduceRequest

type ProduceRequest struct {
	APIVersion int16

	TransactionalID *string
	Acks            int16
	Timeout         time.Duration
	TopicData       []*TopicData
}

func (*ProduceRequest) Decode

func (r *ProduceRequest) Decode(d PacketDecoder, version int16) (err error)

func (*ProduceRequest) Encode

func (r *ProduceRequest) Encode(e PacketEncoder) (err error)

func (*ProduceRequest) Key

func (r *ProduceRequest) Key() int16

func (*ProduceRequest) Version

func (r *ProduceRequest) Version() int16

type ProduceResponse

type ProduceResponse struct {
	APIVersion int16

	Responses    []*ProduceTopicResponse
	ThrottleTime time.Duration
}

func (*ProduceResponse) Decode

func (r *ProduceResponse) Decode(d PacketDecoder, version int16) (err error)

func (*ProduceResponse) Encode

func (r *ProduceResponse) Encode(e PacketEncoder) (err error)

type ProduceTopicResponse

type ProduceTopicResponse struct {
	Topic              string
	PartitionResponses []*ProducePartitionResponse
}

type ProduceTopicResponses

type ProduceTopicResponses []*ProduceTopicResponse

type PushDecoder

type PushDecoder interface {
	SaveOffset(in int)
	ReserveSize() int
	Fill(curOffset int, buf []byte) error
	Check(curOffset int, buf []byte) error
}

type PushEncoder

type PushEncoder interface {
	SaveOffset(in int)
	ReserveSize() int
	Fill(curOffset int, buf []byte) error
}

type Request

type Request struct {
	CorrelationID int32
	ClientID      string
	Body          Body
}

func (*Request) Encode

func (r *Request) Encode(pe PacketEncoder) (err error)

type RequestHeader

type RequestHeader struct {
	// Size of the request
	Size int32
	// ID of the API (e.g. produce, fetch, metadata)
	APIKey int16
	// Version of the API to use
	APIVersion int16
	// User defined ID to correlate requests between server and client
	CorrelationID int32
	// Size of the Client ID
	ClientID string
}

func (*RequestHeader) Decode

func (r *RequestHeader) Decode(d PacketDecoder) error

func (*RequestHeader) Encode

func (r *RequestHeader) Encode(e PacketEncoder)

func (*RequestHeader) String

func (r *RequestHeader) String() string

type Response

type Response struct {
	Size          int32
	CorrelationID int32
	Body          ResponseBody
}

func (Response) Decode

func (r Response) Decode(pd PacketDecoder, version int16) (err error)

func (Response) Encode

func (r Response) Encode(pe PacketEncoder) (err error)

type ResponseBody

type ResponseBody interface {
	Encoder
	VersionedDecoder
}

type SaslHandshakeRequest

type SaslHandshakeRequest struct {
	APIVersion int16
}

func (*SaslHandshakeRequest) Decode

func (r *SaslHandshakeRequest) Decode(d PacketDecoder, version int16) (err error)

func (*SaslHandshakeRequest) Encode

func (r *SaslHandshakeRequest) Encode(e PacketEncoder) (err error)

func (*SaslHandshakeRequest) Key

func (r *SaslHandshakeRequest) Key() int16

func (*SaslHandshakeRequest) Version

func (r *SaslHandshakeRequest) Version() int16

type SaslHandshakeResponse

type SaslHandshakeResponse struct{}

func (*SaslHandshakeResponse) Decode

func (r *SaslHandshakeResponse) Decode(d PacketDecoder, version int16) (err error)

func (*SaslHandshakeResponse) Encode

func (r *SaslHandshakeResponse) Encode(e PacketEncoder) (err error)

type SizeField

type SizeField struct {
	StartOffset int
}

func (*SizeField) Check

func (s *SizeField) Check(curOffset int, buf []byte) error

func (*SizeField) Fill

func (s *SizeField) Fill(curOffset int, buf []byte) error

func (*SizeField) ReserveSize

func (s *SizeField) ReserveSize() int

func (*SizeField) SaveOffset

func (s *SizeField) SaveOffset(in int)

type StopReplicaPartition

type StopReplicaPartition struct {
	Topic     string
	Partition int32
}

type StopReplicaRequest

type StopReplicaRequest struct {
	APIVersion       int16
	ControllerID     int32
	ControllerEpoch  int32
	DeletePartitions bool
	Partitions       []*StopReplicaPartition
}

func (*StopReplicaRequest) Decode

func (r *StopReplicaRequest) Decode(d PacketDecoder, version int16) (err error)

func (*StopReplicaRequest) Encode

func (r *StopReplicaRequest) Encode(e PacketEncoder) (err error)

func (*StopReplicaRequest) Key

func (r *StopReplicaRequest) Key() int16

func (*StopReplicaRequest) Version

func (r *StopReplicaRequest) Version() int16

type StopReplicaResponse

type StopReplicaResponse struct {
	ErrorCode  int16
	Partitions []*StopReplicaResponsePartition
}

func (*StopReplicaResponse) Decode

func (r *StopReplicaResponse) Decode(d PacketDecoder, version int16) (err error)

func (*StopReplicaResponse) Encode

func (r *StopReplicaResponse) Encode(e PacketEncoder) (err error)

type StopReplicaResponsePartition

type StopReplicaResponsePartition struct {
	Topic     string
	Partition int32
	ErrorCode int16
}

type Strings

type Strings []string

type SyncGroupRequest

type SyncGroupRequest struct {
	APIVersion int16

	GroupID          string
	GenerationID     int32
	MemberID         string
	GroupAssignments []GroupAssignment
}

func (*SyncGroupRequest) Decode

func (r *SyncGroupRequest) Decode(d PacketDecoder, version int16) (err error)

func (*SyncGroupRequest) Encode

func (r *SyncGroupRequest) Encode(e PacketEncoder) error

func (*SyncGroupRequest) Key

func (r *SyncGroupRequest) Key() int16

func (*SyncGroupRequest) Version

func (r *SyncGroupRequest) Version() int16

type SyncGroupResponse

type SyncGroupResponse struct {
	APIVersion int16

	ThrottleTime     time.Duration
	ErrorCode        int16
	MemberAssignment []byte
}

func (*SyncGroupResponse) Decode

func (r *SyncGroupResponse) Decode(d PacketDecoder, version int16) (err error)

func (*SyncGroupResponse) Encode

func (r *SyncGroupResponse) Encode(e PacketEncoder) error

func (*SyncGroupResponse) Key

func (r *SyncGroupResponse) Key() int16

type TopicData

type TopicData struct {
	Topic string
	Data  []*Data
}

type TopicDatas

type TopicDatas []*TopicData

type TopicErrorCode

type TopicErrorCode struct {
	Topic        string
	ErrorCode    int16
	ErrorMessage *string
}

type TopicMetadata

type TopicMetadata struct {
	TopicErrorCode    int16
	Topic             string
	PartitionMetadata []*PartitionMetadata
}

type TopicMetadatas

type TopicMetadatas []*TopicMetadata

type UpdateMetadataRequest

type UpdateMetadataRequest struct {
	APIVersion int16
}

func (*UpdateMetadataRequest) Decode

func (r *UpdateMetadataRequest) Decode(d PacketDecoder, version int16) (err error)

func (*UpdateMetadataRequest) Encode

func (r *UpdateMetadataRequest) Encode(e PacketEncoder) (err error)

func (*UpdateMetadataRequest) Key

func (r *UpdateMetadataRequest) Key() int16

func (*UpdateMetadataRequest) Version

func (r *UpdateMetadataRequest) Version() int16

type UpdateMetadataResponse

type UpdateMetadataResponse struct {
}

func (*UpdateMetadataResponse) Decode

func (r *UpdateMetadataResponse) Decode(d PacketDecoder, version int16) (err error)

func (*UpdateMetadataResponse) Encode

func (r *UpdateMetadataResponse) Encode(e PacketEncoder) (err error)

type VersionedDecoder

type VersionedDecoder interface {
	Decode(d PacketDecoder, version int16) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL