tasq

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2023 License: MIT Imports: 11 Imported by: 0

README

godoc for greencoda/tasq Build Status Go 1.19 Go Report card

tasq

Tasq is Golang task queue using SQL database for persistence. Currently supports:

  • PostgreSQL
  • MySQL

Install

go get -u github.com/greencoda/tasq

Usage Example

To try tasq locally, you'll need a local DB running on your machine. You may use the supplied docker-compose.yml file to start a local instance

docker-compose -f _examples/<example_repo_name>/docker-compose.yml up -d

Afterwards simply run the example.go file

go run _examples/<example_repo_name>/main.go

Documentation

Overview

Package tasq provides a task queue implementation compapible with multiple repositories

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConsumerAlreadyRunning    = errors.New("consumer has already been started")
	ErrConsumerAlreadyStopped    = errors.New("consumer has already been stopped")
	ErrCouldNotActivateTasks     = errors.New("a number of tasks could not be activated")
	ErrCouldNotPollTasks         = errors.New("could not poll tasks")
	ErrCouldNotPingTasks         = errors.New("could not ping tasks")
	ErrTaskTypeAlreadyLearned    = errors.New("task with this type already learned")
	ErrTaskTypeNotFound          = errors.New("task with this type not found")
	ErrTaskTypeNotKnown          = errors.New("task with this type is not known by this consumer")
	ErrUnknownPollStrategy       = errors.New("unknown poll strategy")
	ErrVisibilityTimeoutTooShort = errors.New("visibility timeout must be longer than poll interval")
)

Collection of consumer errors.

Functions

func NoopLogger added in v0.4.3

func NoopLogger() *log.Logger

NoopLogger discards the log messages written to it.

Types

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields
}

Cleaner is a service instance created by a Client with reference to that client and the task age limit parameter.

func (*Cleaner) Clean

func (c *Cleaner) Clean(ctx context.Context) (int64, error)

Clean will initiate the removal of finished (either succeeded or failed) tasks from the tasks table if they have been created long enough ago for them to be eligible.

func (*Cleaner) WithTaskAge

func (c *Cleaner) WithTaskAge(taskAge time.Duration) *Cleaner

WithTaskAge defines the minimum time duration that must have passed since the creation of a finished task in order for it to be eligible for cleanup when the Cleaner's Clean() method is called.

Default value: 15 minutes.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps the tasq repository interface which is used by the different services to access the database.

func NewClient

func NewClient(repository IRepository) *Client

NewClient creates a new tasq client instance with the provided tasq.

func (*Client) NewCleaner

func (c *Client) NewCleaner() *Cleaner

NewCleaner creates a new cleaner with a reference to the original tasq client.

func (*Client) NewConsumer

func (c *Client) NewConsumer() *Consumer

NewConsumer creates a new consumer with a reference to the original tasq client and default consumer parameters.

func (*Client) NewInspector added in v1.1.0

func (c *Client) NewInspector() *Inspector

NewInspector creates a new inspector with a reference to the original tasq client.

func (*Client) NewProducer

func (c *Client) NewProducer() *Producer

NewProducer creates a new consumer with a reference to the original tasq client.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a service instance created by a Client with reference to that client and the various parameters that define the task consumption behaviour.

func (*Consumer) Channel

func (c *Consumer) Channel() <-chan *func()

Channel returns a read-only channel where the polled jobs can be read from.

func (*Consumer) Forget

func (c *Consumer) Forget(taskType string) error

Forget removes a handler function for the specified taskType from the map of learned handler functions. If the specified taskType does not exist, it'll return an error.

func (*Consumer) Learn

func (c *Consumer) Learn(taskType string, f HandlerFunc, override bool) error

Learn sets a handler function for the specified taskType. If override is false and a handler function is already set for the specified taskType, it'll return an error.

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

Start launches the go routine which manages the pinging and polling of tasks for the consumer, or returns an error if the consumer is not properly configured.

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop sends the termination signal to the consumer so it'll no longer poll for news tasks.

func (*Consumer) WithAutoDeleteOnSuccess

func (c *Consumer) WithAutoDeleteOnSuccess(autoDeleteOnSuccess bool) *Consumer

WithAutoDeleteOnSuccess sets whether successful tasks should be automatically deleted from the task queue by the consumer.

Default value: false.

func (*Consumer) WithChannelSize

func (c *Consumer) WithChannelSize(channelSize int) *Consumer

WithChannelSize sets the size of the buffered channel used for outputting the polled messages to.

Default value: 10.

func (*Consumer) WithLogger

func (c *Consumer) WithLogger(logger Logger) *Consumer

WithLogger sets the Logger interface that is used for event logging during task consumption.

Default value: NoopLogger.

func (*Consumer) WithMaxActiveTasks

func (c *Consumer) WithMaxActiveTasks(maxActiveTasks int) *Consumer

WithMaxActiveTasks sets the maximum number of tasks a consumer can have enqueued at the same time before polling for additional ones.

Default value: 10.

func (*Consumer) WithPollInterval

func (c *Consumer) WithPollInterval(pollInterval time.Duration) *Consumer

WithPollInterval sets the interval at which the consumer will try and poll for new tasks to be executed must not be greater than or equal to visibility timeout.

Default value: 5 seconds.

func (*Consumer) WithPollLimit

func (c *Consumer) WithPollLimit(pollLimit int) *Consumer

WithPollLimit sets the maximum number of messages polled from the task queue.

Default value: 10.

func (*Consumer) WithPollStrategy

func (c *Consumer) WithPollStrategy(pollStrategy PollStrategy) *Consumer

WithPollStrategy sets the ordering to be used when polling for tasks from the task queue.

Default value: PollStrategyByCreatedAt.

func (*Consumer) WithQueues

func (c *Consumer) WithQueues(queues ...string) *Consumer

WithQueues sets the queues from which the consumer may poll for tasks.

Default value: empty slice of strings.

func (*Consumer) WithVisibilityTimeout

func (c *Consumer) WithVisibilityTimeout(visibilityTimeout time.Duration) *Consumer

WithVisibilityTimeout sets the duration by which each ping will extend a task's visibility timeout; Once this timeout is up, a consumer instance may receive the task again.

Default value: 15 seconds.

type HandlerFunc added in v1.0.4

type HandlerFunc func(task *Task) error

HandlerFunc is the function signature for the handler functions that are used to process tasks.

type IRepository added in v1.0.0

type IRepository interface {
	Migrate(ctx context.Context) error

	PingTasks(ctx context.Context, taskIDs []uuid.UUID, visibilityTimeout time.Duration) ([]*Task, error)
	PollTasks(ctx context.Context, types, queues []string, visibilityTimeout time.Duration, ordering Ordering, limit int) ([]*Task, error)
	CleanTasks(ctx context.Context, minimumAge time.Duration) (int64, error)

	RegisterStart(ctx context.Context, task *Task) (*Task, error)
	RegisterError(ctx context.Context, task *Task, errTask error) (*Task, error)
	RegisterFinish(ctx context.Context, task *Task, finishStatus TaskStatus) (*Task, error)

	SubmitTask(ctx context.Context, task *Task) (*Task, error)
	DeleteTask(ctx context.Context, task *Task, safeDelete bool) error
	RequeueTask(ctx context.Context, task *Task) (*Task, error)

	ScanTasks(ctx context.Context, taskStatuses []TaskStatus, taskTypes, queues []string, ordering Ordering, limit int) ([]*Task, error)
	CountTasks(ctx context.Context, taskStatuses []TaskStatus, taskTypes, queues []string) (int64, error)
	PurgeTasks(ctx context.Context, taskStatuses []TaskStatus, taskTypes, queues []string, safeDelete bool) (int64, error)
}

IRepository describes the mandatory methods a repository must implement in order for tasq to be able to use it.

type Inspector added in v1.1.0

type Inspector struct {
	// contains filtered or unexported fields
}

Inspector is a service instance created by a Client with reference to that client with the purpose of enabling the observation of tasks.

func (*Inspector) Count added in v1.1.0

func (o *Inspector) Count(ctx context.Context, taskStatuses []TaskStatus, taskTypes, queues []string) (int64, error)

Count returns a the total number of tasks based on the supplied filter arguments.

func (*Inspector) Delete added in v1.1.0

func (o *Inspector) Delete(ctx context.Context, safeDelete bool, tasks ...*Task) error

Delete will remove the supplied tasks.

func (*Inspector) Purge added in v1.1.0

func (o *Inspector) Purge(ctx context.Context, safeDelete bool, taskStatuses []TaskStatus, taskTypes, queues []string) (int64, error)

Purge will remove all tasks based on the supplied filter arguments.

func (*Inspector) Scan added in v1.1.0

func (o *Inspector) Scan(ctx context.Context, taskStatuses []TaskStatus, taskTypes, queues []string, ordering Ordering, limit int) ([]*Task, error)

Scan returns a list of tasks based on the supplied filter arguments.

type Logger

type Logger interface {
	Print(v ...any)
	Printf(format string, v ...any)
}

Logger is the interface used for event logging during task consumption.

type Ordering added in v1.0.0

type Ordering int

Ordering is an enum type describing the polling strategy utitlized during the polling process.

const (
	OrderingCreatedAtFirst Ordering = iota
	OrderingPriorityFirst
)

The collection of orderings.

type PollStrategy

type PollStrategy string

PollStrategy is the label assigned to the ordering by which tasks are polled for consumption.

const (
	PollStrategyByCreatedAt PollStrategy = "pollByCreatedAt" // Poll by oldest tasks first
	PollStrategyByPriority  PollStrategy = "pollByPriority"  // Poll by highest priority task first
)

Collection of pollStrategies.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is a service instance created by a Client with reference to that client with the purpose of enabling the submission of new tasks.

func (*Producer) Submit

func (p *Producer) Submit(ctx context.Context, taskType string, taskArgs any, queue string, priority int16, maxReceives int32) (*Task, error)

Submit constructs and submits a new task to the queue based on the supplied arguments.

func (*Producer) SubmitTask added in v1.0.2

func (p *Producer) SubmitTask(ctx context.Context, task *Task) (*Task, error)

SubmitTask submits an existing task struct to the queue based on the supplied arguments.

type Task

type Task struct {
	ID           uuid.UUID
	Type         string
	Args         []byte
	Queue        string
	Priority     int16
	Status       TaskStatus
	ReceiveCount int32
	MaxReceives  int32
	LastError    *string
	CreatedAt    time.Time
	StartedAt    *time.Time
	FinishedAt   *time.Time
	VisibleAt    time.Time
}

Task is the struct used to represent an atomic task managed by tasq.

func NewTask added in v1.0.0

func NewTask(taskType string, taskArgs any, queue string, priority int16, maxReceives int32) (*Task, error)

NewTask creates a new Task struct based on the supplied arguments required to define it.

func (*Task) IsLastReceive added in v1.0.1

func (t *Task) IsLastReceive() bool

IsLastReceive returns true if the task has reached its maximum number of receives.

func (*Task) SetVisibility added in v1.0.1

func (t *Task) SetVisibility(visibleAt time.Time)

SetVisibility sets the time at which the task will become visible again.

func (*Task) UnmarshalArgs

func (t *Task) UnmarshalArgs(target any) error

UnmarshalArgs decodes the task arguments into the passed target interface.

type TaskStatus added in v1.0.0

type TaskStatus string

TaskStatus is an enum type describing the status a task is currently in.

const (
	StatusNew        TaskStatus = "NEW"
	StatusEnqueued   TaskStatus = "ENQUEUED"
	StatusInProgress TaskStatus = "IN_PROGRESS"
	StatusSuccessful TaskStatus = "SUCCESSFUL"
	StatusFailed     TaskStatus = "FAILED"
)

The collection of possible task statuses.

func GetTaskStatuses added in v1.0.0

func GetTaskStatuses(taskStatusGroup TaskStatusGroup) []TaskStatus

GetTaskStatuses returns a slice of TaskStatuses based on the TaskStatusGroup passed as an argument.

type TaskStatusGroup added in v1.0.0

type TaskStatusGroup int

TaskStatusGroup is an enum type describing the key used in the map of TaskStatuses which groups them for different purposes.

const (
	AllTasks TaskStatusGroup = iota
	OpenTasks
	FinishedTasks
)

The collection of possible task status groupings.

Directories

Path Synopsis
_examples
repository
mysql
Package mysql provides the implementation of a tasq repository in MySQL
Package mysql provides the implementation of a tasq repository in MySQL
postgres
Package postgres provides the implementation of a tasq repository in PostgreSQL
Package postgres provides the implementation of a tasq repository in PostgreSQL

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL