Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // DispatcherInjector is the injector for the Dispatcher module DispatcherInjector = wire.NewSet(NewMessageDispatcher, wire.Struct(new(Configuration), "DeliveryJobRepo", "ConsumerRepo", "LockRepo", "BrokerConfig", "ConsumerConnectionConfig", "MsgRepo")) )
Functions ¶
This section is empty.
Types ¶
type Configuration ¶
type Configuration struct { DeliveryJobRepo storage.DeliveryJobRepository ConsumerRepo storage.ConsumerRepository LockRepo storage.LockRepository MsgRepo storage.MessageRepository BrokerConfig config.BrokerConfig ConsumerConnectionConfig config.ConsumerConnectionConfig }
Configuration represents the configuration for a dispatcher
type Job ¶
type Job struct { Data *data.DeliveryJob Priority uint }
Job represents the job to be run
func NewJob ¶
func NewJob(job *data.DeliveryJob) *Job
NewJob returns a new instance of Job. Only call this method if Job.IsInValidState() is true, else can result a panic
type MessageDispatcher ¶
MessageDispatcher is the contract for dispatching message
func NewMessageDispatcher ¶
func NewMessageDispatcher(configuration *Configuration) MessageDispatcher
NewMessageDispatcher retrieves new instance of MessageDispatcher
type MessageDispatcherImpl ¶
type MessageDispatcherImpl struct {
// contains filtered or unexported fields
}
MessageDispatcherImpl is responsible for dispatching delivery jobs from acknowledged message
func (*MessageDispatcherImpl) Dispatch ¶
func (msgDispatcher *MessageDispatcherImpl) Dispatch(message *data.Message)
Dispatch is responsible for dispatching delivery jobs for the message
func (*MessageDispatcherImpl) StartDispatcher ¶
func (msgDispatcher *MessageDispatcherImpl) StartDispatcher()
StartDispatcher starts consuming jobs and should be called as a coroutine.
func (*MessageDispatcherImpl) Stop ¶
func (msgDispatcher *MessageDispatcherImpl) Stop()
Stop stops the workers of the dispatcher
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
A PriorityQueue implements heap.Interface and holds Items.
func NewJobPriorityQueue ¶
func NewJobPriorityQueue() *PriorityQueue
NewJobPriorityQueue initializes a priority queue for Jobs
func (*PriorityQueue) Dequeue ¶
func (pq *PriorityQueue) Dequeue() *Job
Dequeue pops the item next in order
func (*PriorityQueue) Enqueue ¶
func (pq *PriorityQueue) Enqueue(job *Job)
Enqueue queues the item in its correct position
func (*PriorityQueue) Len ¶
func (pq *PriorityQueue) Len() int
Len returns the length of the priority queue
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents the worker that executes the job
func NewWorker ¶
func NewWorker(workerPool chan chan *Job, consumerConfig config.ConsumerConnectionConfig, brokerConfig config.BrokerConfig, deliveryJobRepo storage.DeliveryJobRepository) Worker
NewWorker creates a Worker