Versions in this module Expand all Collapse all v2 v2.2.1 Oct 19, 2020 Changes in this version + var ErrNotImplemented = errors.New("not implemented") + var ErrTimeout = errors.New("timeout") + type Broker struct + OffsetEarliestHandler func(string, int32) (int64, error) + OffsetLatestHandler func(string, int32) (int64, error) + func NewBroker() *Broker + func (b *Broker) Close() + func (b *Broker) Consumer(conf kafka.ConsumerConf) (kafka.Consumer, error) + func (b *Broker) OffsetCoordinator(conf kafka.OffsetCoordinatorConf) (kafka.OffsetCoordinator, error) + func (b *Broker) OffsetEarliest(topic string, partition int32) (int64, error) + func (b *Broker) OffsetLatest(topic string, partition int32) (int64, error) + func (b *Broker) Producer(kafka.ProducerConf) kafka.Producer + func (b *Broker) ReadProducers(timeout time.Duration) (*ProducedMessages, error) + type Consumer struct + Broker *Broker + Errors chan error + Messages chan *proto.Message + func (c *Consumer) Consume() (*proto.Message, error) + type Middleware func(nodeID int32, requestKind int16, content []byte) Response + type OffsetCoordinator struct + Broker *Broker + CommitHandler func(consumerGroup string, topic string, partition int32, offset int64) error + OffsetHandler func(consumerGroup string, topic string, partition int32) (offset int64, metadata string, err error) + Offsets map[string]int64 + func (c *OffsetCoordinator) Commit(topic string, partition int32, offset int64) error + func (c *OffsetCoordinator) Offset(topic string, partition int32) (offset int64, metadata string, err error) + type ProducedMessages struct + Messages []*proto.Message + Partition int32 + Topic string + type Producer struct + Broker *Broker + ResponseError error + func (p *Producer) Produce(topic string, partition int32, messages ...*proto.Message) (int64, error) + func (p *Producer) ResponseOffset() int64 + type Response interface + Bytes func() ([]byte, error) + type Server struct + func NewServer(middlewares ...Middleware) *Server + func (s *Server) AddMessages(topic string, partition int32, messages ...*proto.Message) + func (s *Server) Addr() string + func (s *Server) Close() (err error) + func (s *Server) MustSpawn() + func (s *Server) Reset() + func (s *Server) Run(addr string) error + func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)