gobble

package
v0.0.0-...-ca24b28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var WaitMaxDuration = 5 * time.Second

Functions

This section is empty.

Types

type Config

type Config struct {
	WaitMaxDuration time.Duration
	MaxQueueLength  int
}

type ConnectionInterface

type ConnectionInterface interface {
	Insert(...interface{}) error
}

type DB

type DB struct {
	Connection *gorp.DbMap
}

func NewDatabase

func NewDatabase(db *sql.DB) *DB

func (DB) Migrate

func (db DB) Migrate(migrationsPath string)

type DatabaseInterface

type DatabaseInterface interface {
	Migrate(string)
}

type Heartbeater

type Heartbeater struct {
	// contains filtered or unexported fields
}

func NewHeartbeater

func NewHeartbeater(queue QueueInterface, ticker TickerInterface) Heartbeater

func (Heartbeater) Beat

func (beater Heartbeater) Beat(job *Job)

func (Heartbeater) Halt

func (beater Heartbeater) Halt()

type Initializer

type Initializer struct{}

func (Initializer) InitializeDBMap

func (Initializer) InitializeDBMap(dbMap *gorp.DbMap)

type Job

type Job struct {
	ID          int       `db:"id"`
	WorkerID    string    `db:"worker_id"`
	Payload     string    `db:"payload"`
	Version     int64     `db:"version"`
	RetryCount  int       `db:"retry_count"`
	ActiveAt    time.Time `db:"active_at"`
	ShouldRetry bool      `db:"-"`
}

func NewJob

func NewJob(data interface{}) *Job

func (*Job) Retry

func (job *Job) Retry(duration time.Duration)

func (*Job) State

func (job *Job) State() (int, time.Time)

func (Job) Unmarshal

func (job Job) Unmarshal(v interface{}) error

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(database DatabaseInterface, clock clock, config Config) *Queue

func (*Queue) Close

func (queue *Queue) Close()

func (*Queue) Dequeue

func (queue *Queue) Dequeue(job *Job)

func (*Queue) Enqueue

func (queue *Queue) Enqueue(job *Job, connection ConnectionInterface) (*Job, error)

func (*Queue) Len

func (queue *Queue) Len() (int, error)

func (*Queue) Requeue

func (queue *Queue) Requeue(job *Job)

func (*Queue) Reserve

func (queue *Queue) Reserve(workerID string) <-chan *Job

type QueueGauge

type QueueGauge struct {
	// contains filtered or unexported fields
}

func NewQueueGauge

func NewQueueGauge(queue queue, timer <-chan time.Time) QueueGauge

func (QueueGauge) Run

func (g QueueGauge) Run()

type QueueInterface

type QueueInterface interface {
	Enqueue(*Job, ConnectionInterface) (*Job, error)
	Reserve(string) <-chan *Job
	Dequeue(*Job)
	Requeue(*Job)
	Len() (int, error)
}

type Ticker

type Ticker struct {
	// contains filtered or unexported fields
}

func NewTicker

func NewTicker(tickerConstructor func(time.Duration) *time.Ticker, duration time.Duration) *Ticker

func (*Ticker) Start

func (t *Ticker) Start()

func (Ticker) Stop

func (t Ticker) Stop()

func (Ticker) Tick

func (t Ticker) Tick() <-chan time.Time

type TickerInterface

type TickerInterface interface {
	Tick() <-chan time.Time
	Start()
	Stop()
}

type Worker

type Worker struct {
	ID string
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(id int, queue QueueInterface, callback func(*Job), beater heartbeater) Worker

func (*Worker) Halt

func (worker *Worker) Halt()

func (*Worker) Perform

func (worker *Worker) Perform() int

func (*Worker) Work

func (worker *Worker) Work()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL