Documentation ¶
Index ¶
- type ConsumerManager
- type Distributor
- type Logger
- type Metrics
- type Option
- func WithConsumerManager(consumerManager ConsumerManager) Option
- func WithDefaultConsumeLimit(n int64) Option
- func WithDefaultQueue(dirs []string, cache bool, entries int64) Option
- func WithDistributor(distributor Distributor) Option
- func WithFileQueue(dirs []string, cache bool, entries int64) Option
- func WithLogger(logger Logger) Option
- func WithMetrics(metrics Metrics) Option
- func WithMiddleware(middleware ...func(http.Handler) http.Handler) Option
- func WithPublicAddr(addr string) Option
- func WithQueue(q Queue) Option
- func WithWebsocketInterval(d time.Duration) Option
- type Queue
- type Server
- func (s *Server) Close() error
- func (s *Server) HandleConsume(w http.ResponseWriter, r *http.Request)
- func (s *Server) HandleCreateTopic(w http.ResponseWriter, r *http.Request)
- func (s *Server) HandleDeleteTopic(w http.ResponseWriter, r *http.Request)
- func (s *Server) HandleGetAllTopics(w http.ResponseWriter, r *http.Request)
- func (s *Server) HandleModifyTopic(w http.ResponseWriter, r *http.Request)
- func (s *Server) HandleOptions(w http.ResponseWriter, r *http.Request)
- func (s *Server) HandleProduce(w http.ResponseWriter, r *http.Request)
- func (s *Server) HandleWatchTopics(w http.ResponseWriter, r *http.Request)
- func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
- type TestLogger
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerManager ¶
type Distributor ¶
type Logger ¶
type Logger interface { Errorf(format string, args ...interface{}) Warnf(format string, args ...interface{}) Infof(format string, args ...interface{}) Debugf(format string, args ...interface{}) }
Logger is a handler for log messages of varying severities
type Metrics ¶
Metrics allows for custom metric handlers for counting the number of messages and/or batch size
type Option ¶
Option represents a optional function argument to NewServer
func WithConsumerManager ¶
func WithConsumerManager(consumerManager ConsumerManager) Option
WithConsumerManager overrides the default consumer manager
func WithDefaultConsumeLimit ¶
WithDefaultConsumeLimit sets the default consume limit for clients that consume with limit < 0
func WithDefaultQueue ¶
WithDefaultQueue sets the queue
func WithDistributor ¶
func WithDistributor(distributor Distributor) Option
WithDistributor overrides the default distributor
func WithFileQueue ¶
WithFileQueue sets the queue
func WithMetrics ¶
WithMetrics sets the handler for produce and consume metrics
func WithMiddleware ¶
WithMiddleware adds the given middleware to the endpoints defined in the http router
func WithPublicAddr ¶
WithPublicAddr sets the public address of the current server
func WithWebsocketInterval ¶
WithWebsocketInterval sets the interval between pings for a websocket connection
type Queue ¶
type Queue interface { Close() error ListTopics(regex *regexp.Regexp) ([]string, error) CreateTopic(topic string) error DeleteTopic(topic string) error ModifyTopic(topic string, request headers.ModifyRequest) (*headers.TopicInfo, error) WatchTopics(topics []string) (written, deleted chan string, closer io.Closer, err error) Produce(topic string, msgSizes []int64, timestamp uint64, r io.Reader) error Consume(group, topic string, id int64, limit int64, w http.ResponseWriter) (int, error) }
Queue is the interface used by the server to produce and consume messages from different distinct categories called topics
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is an http server on top of the given queue (defaults to a file based queue)
func (*Server) HandleConsume ¶
func (s *Server) HandleConsume(w http.ResponseWriter, r *http.Request)
HandleConsume handles requests to the /topics/... endpoints with method == GET. It will retrieve messages from the queue topic
func (*Server) HandleCreateTopic ¶
func (s *Server) HandleCreateTopic(w http.ResponseWriter, r *http.Request)
HandleCreateTopic handles requests to the /topics/... endpoints with method == PUT. It will create a topic if the topic does not exist.
func (*Server) HandleDeleteTopic ¶
func (s *Server) HandleDeleteTopic(w http.ResponseWriter, r *http.Request)
HandleDeleteTopic handles requests to the /topics/... endpoints with method == DELETE. It will delete a topic if the topic exists.
func (*Server) HandleGetAllTopics ¶
func (s *Server) HandleGetAllTopics(w http.ResponseWriter, r *http.Request)
HandleGetAllTopics handles requests to the /topics endpoints with method == GET. It returns all topics currently defined in the queue as either a json or csv depending on the request content-type header
func (*Server) HandleModifyTopic ¶
func (s *Server) HandleModifyTopic(w http.ResponseWriter, r *http.Request)
HandleModifyTopic handles requests to the /topics/... endpoints with method == PATCH. It will modify the topic if the topic exists. This is used to truncate topics by message offset or mod time.
func (*Server) HandleOptions ¶
func (s *Server) HandleOptions(w http.ResponseWriter, r *http.Request)
HandleOptions handles requests to the /topics/... endpoints with method == OPTIONS
func (*Server) HandleProduce ¶
func (s *Server) HandleProduce(w http.ResponseWriter, r *http.Request)
HandleProduce handles requests to the /topics/... endpoints with method == POST. It will add the given messages to the queue topic
func (*Server) HandleWatchTopics ¶
func (s *Server) HandleWatchTopics(w http.ResponseWriter, r *http.Request)
HandleWatchTopics accepts websocket connections and watches the topic files for writes
type TestLogger ¶
func (TestLogger) Debugf ¶
func (t TestLogger) Debugf(format string, args ...interface{})
func (TestLogger) Errorf ¶
func (t TestLogger) Errorf(format string, args ...interface{})
func (TestLogger) Infof ¶
func (t TestLogger) Infof(format string, args ...interface{})
func (TestLogger) Warnf ¶
func (t TestLogger) Warnf(format string, args ...interface{})