Documentation ¶
Index ¶
- Variables
- type AgeingClient
- type Consumer
- type Message
- type MessageConsumer
- func NewAgeingConsumer(config QueueConfig, handler func(m Message), client *AgeingClient) MessageConsumer
- func NewBatchedConsumer(config QueueConfig, handler func(m []Message), client *http.Client, ...) MessageConsumer
- func NewConsumer(config QueueConfig, handler func(m Message), client *http.Client, ...) MessageConsumer
- type QueueConfig
Constants ¶
This section is empty.
Variables ¶
var ErrNoQueueAddresses = errors.New("no kafka-rest-proxy addresses configured")
Functions ¶
This section is empty.
Types ¶
type AgeingClient ¶
AgeingClient defines an ageing http client for consuming messages
func NewAgeingClient ¶
func NewAgeingClient(client *http.Client, maxAge time.Duration, logger *log.UPPLogger) (*AgeingClient, error)
NewAgeingClient returns a new instance of AgeingClient. It guarantees that all required properties are set
func (AgeingClient) StartAgeingProcess ¶
func (c AgeingClient) StartAgeingProcess()
StartAgeingProcess periodically close idle connections according to the MaxAge of an AgeingClient
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer provides methods to consume messages from a kafka proxy
func (*Consumer) ConnectivityCheck ¶
ConnectivityCheck returns the connection status with the kafka proxy
type MessageConsumer ¶
MessageConsumer is a high level generic interface for consumers.
Start triggers the consumption of messages.
Stop method stops the consumption of messages.
ConnectivityCheck implements the logic to check the current connectivity to the queue. The method should return a message about the status of the connection and an error in case of connectivity failure.
func NewAgeingConsumer ¶
func NewAgeingConsumer(config QueueConfig, handler func(m Message), client *AgeingClient) MessageConsumer
NewAgeingConsumer returns a new instance of a Consumer with an AgeingClient
func NewBatchedConsumer ¶
func NewBatchedConsumer(config QueueConfig, handler func(m []Message), client *http.Client, logger *log.UPPLogger) MessageConsumer
NewBatchedConsumer returns a Consumer to manage batches of messages
func NewConsumer ¶
func NewConsumer(config QueueConfig, handler func(m Message), client *http.Client, logger *log.UPPLogger) MessageConsumer
NewConsumer returns a new instance of a Consumer
type QueueConfig ¶
type QueueConfig struct { Addrs []string `json:"address"` //list of queue addresses. Group string `json:"group"` Topic string `json:"topic"` Queue string `json:"queue"` //The name of the queue. Offset string `json:"offset"` BackoffPeriod int `json:"backoffPeriod"` StreamCount int `json:"streamCount"` ConcurrentProcessing bool `json:"concurrentProcessing"` AuthorizationKey string `json:"authorizationKey"` AutoCommitEnable bool `json:"autoCommitEnable"` NoOfProcessors int `json:"noOfProcessors"` }
QueueConfig represents the configuration of the queue, consumer group and topic the consumer interested about.