Versions in this module Expand all Collapse all v0 v0.0.1 Nov 20, 2019 Changes in this version + const DefaultMaxMessagesPerBatch + const MaxBatchSize + const MaxFrameSize + const MaxMessageSize + var ErrCorruptedMessage = errors.New("corrupted message") + var ErrEOM = errors.New("EOF") + func ConvertFromStringMap(m map[string]string) []*pb.KeyValue + func ConvertToStringMap(pbb []*pb.KeyValue) map[string]string + func Crc32cCheckSum(data []byte) uint32 + func GetAndAdd(n *uint64, diff uint64) uint64 + func JavaStringHash(s string) uint32 + func Murmur3_32Hash(s string) uint32 + func NewDefaultRouter(clock Clock, hashFunc func(string) uint32, maxBatchingDelay time.Duration) func(string, uint32) int + func TimestampMillis(t time.Time) uint64 + type Backoff struct + func (b *Backoff) Next() time.Duration + type BatchBuilder struct + func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64, ...) (*BatchBuilder, error) + func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID uint64, payload []byte, ...) bool + func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks []interface{}) + func (bb *BatchBuilder) IsFull() bool + type BlockingQueue interface + Iterator func() BlockingQueueIterator + Peek func() interface{} + PeekLast func() interface{} + Poll func() interface{} + Put func(item interface{}) + Size func() int + Take func() interface{} + func NewBlockingQueue(maxSize int) BlockingQueue + type BlockingQueueIterator interface + HasNext func() bool + Next func() interface{} + type Buffer interface + Capacity func() uint32 + Clear func() + Get func(readerIndex uint32, size uint32) []byte + IsWritable func() bool + MoveToFront func() + Put func(writerIdx uint32, s []byte) + PutUint32 func(n uint32, writerIdx uint32) + Read func(size uint32) []byte + ReadUint16 func() uint16 + ReadUint32 func() uint32 + ReadableBytes func() uint32 + ReadableSlice func() []byte + ReaderIndex func() uint32 + Resize func(newSize uint32) + WritableBytes func() uint32 + WritableSlice func() []byte + Write func(s []byte) + WriteUint16 func(n uint16) + WriteUint32 func(n uint32) + WriterIndex func() uint32 + WrittenBytes func(size uint32) + func NewBuffer(size int) Buffer + func NewBufferWrapper(buf []byte) Buffer + type CheckSum struct + func (cs *CheckSum) Write(p []byte) (int, error) + type Clock func() uint64 + func NewSystemClock() Clock + type Closable interface + Close func() + type Connection interface + AddConsumeHandler func(id uint64, handler ConsumerHandler) + Close func() + DeleteConsumeHandler func(id uint64) + RegisterListener func(id uint64, listener ConnectionListener) + SendRequest func(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error)) + SendRequestNoWait func(req *pb.BaseCommand) + UnregisterListener func(id uint64) + WriteData func(data []byte) + type ConnectionListener interface + ConnectionClosed func() + ReceivedSendReceipt func(response *pb.CommandSendReceipt) + type ConnectionPool interface + Close func() + GetConnection func(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) + func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider) ConnectionPool + type ConsumerHandler interface + ConnectionClosed func() + MessageReceived func(response *pb.CommandMessage, headersAndPayload Buffer) error + type LookupResult struct + LogicalAddr *url.URL + PhysicalAddr *url.URL + type LookupService interface + Lookup func(topic string) (*LookupResult, error) + func NewLookupService(rpcClient RPCClient, serviceURL *url.URL) LookupService + type MessageReader struct + func NewMessageReader(headersAndPayload Buffer) *MessageReader + func NewMessageReaderFromArray(headersAndPayload []byte) *MessageReader + func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) + func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) + func (r *MessageReader) ResetBuffer(buffer Buffer) + type RPCClient interface + NewConsumerID func() uint64 + NewProducerID func() uint64 + NewRequestID func() uint64 + Request func(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, ...) (*RPCResult, error) + RequestOnCnx func(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, ...) (*RPCResult, error) + RequestOnCnxNoWait func(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) + RequestToAnyBroker func(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) + func NewRPCClient(serviceURL *url.URL, pool ConnectionPool) RPCClient + type RPCResult struct + Cnx Connection + Response *pb.BaseCommand + type Semaphore chan bool + func (s Semaphore) Acquire() + func (s Semaphore) Release() + type TLSOptions struct + AllowInsecureConnection bool + TrustCertsFilePath string + ValidateHostname bool + type TopicName struct + Name string + Namespace string + Partition int + func ParseTopicName(topic string) (*TopicName, error)