taskq

package module
v1.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2023 License: MIT Imports: 6 Imported by: 1

README

TaskQ

Go Report GoDoc Build Status codecov

Simple and powerful goroutine manager.


Installing

go get github.com/antonmashko/taskq

Purpose

Usually, you need TaskQ for managing your goroutines. This will allow you to control service resource consumption and graceful shutdown.

Example

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/antonmashko/taskq"
)

type Task struct{}

func (Task) Do(ctx context.Context) error {
	fmt.Println("hello world")
	return nil
}

func main() {
	// Initializing new TaskQ instance with a limit of 10 max active goroutines
	// Use `limit=0` for not limiting goroutines number.
	tq := taskq.New(10)

	// Starting reading and executing tasks from queue
	err := tq.Start()
	if err != nil {
		log.Fatal(err)
	}

	// Enqueue new task for execution.
	// TaskQ will run Do method of Task when it will have an available worker (goroutine)
	_, err = tq.Enqueue(context.Background(), Task{})
	if err != nil {
		log.Fatal(err)
	}

	// Gracefully shutting down
	err = tq.Close()
	if err != nil {
		log.Fatal(err)
	}
}

More examples here

Persistence and Queues

By default TaskQ stores all tasks in memory using ConcurrencyQueue. For creating custom queue you need to implement interface Queue and pass it as argument on creating NewWithQueue. See example of how to adapt redis queue into TaskQ

Task Events

Task support two type of events:

  1. Done - completion of the task. https://pkg.golang.ir/github.com/antonmashko/taskq#TaskDone
  2. OnError - error handling event. https://pkg.golang.ir/github.com/antonmashko/taskq#TaskOnError For invoking event implement interface on your task (example).

Graceful shutdown

Shutdown and Close gracefully shuts down the TaskQ without interrupting any active tasks. If TaskQ need to finish all tasks in queue, use context ContextWithWait as Shutdown method argument.

Benchmark results

Benchmarks

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStarted = errors.New("taskq started")
	ErrClosed  = errors.New("taskq closed")
	ErrNilTask = errors.New("nil task")
)
View Source
var (
	EmptyQueue = errors.New("empty queue")
)

Functions

func ContextWithWait added in v1.1.0

func ContextWithWait(ctx context.Context) context.Context

Types

type ConcurrentQueue added in v0.4.0

type ConcurrentQueue struct {
	// contains filtered or unexported fields
}

func NewConcurrentQueue added in v0.4.0

func NewConcurrentQueue() *ConcurrentQueue

func (*ConcurrentQueue) Dequeue added in v0.4.0

func (q *ConcurrentQueue) Dequeue(_ context.Context) (Task, error)

func (*ConcurrentQueue) Enqueue added in v0.4.0

func (q *ConcurrentQueue) Enqueue(_ context.Context, t Task) (int64, error)

func (*ConcurrentQueue) Len added in v1.0.2

func (q *ConcurrentQueue) Len(_ context.Context) int

type Queue added in v0.4.0

type Queue interface {
	Enqueue(context.Context, Task) (int64, error)
	// Dequeue Task from queue
	// if queue empty return `EmptyQueue` as error
	Dequeue(context.Context) (Task, error)
}

type Task

type Task interface {
	Do(ctx context.Context) error
}

Task for TaskQ

type TaskDone added in v0.4.2

type TaskDone interface {
	Done(context.Context)
}

type TaskFunc

type TaskFunc func(ctx context.Context) error

func (TaskFunc) Do

func (t TaskFunc) Do(ctx context.Context) error

type TaskOnError added in v0.4.2

type TaskOnError interface {
	OnError(context.Context, error)
}

type TaskQ

type TaskQ struct {
	OnDequeueError func(ctx context.Context, workerID uint64, err error)
	// contains filtered or unexported fields
}

func New

func New(limit int) *TaskQ

func NewWithQueue added in v0.4.0

func NewWithQueue(limit int, q Queue) *TaskQ

func (*TaskQ) Close

func (t *TaskQ) Close() error

func (*TaskQ) Enqueue

func (t *TaskQ) Enqueue(ctx context.Context, task Task) (int64, error)

func (*TaskQ) Shutdown added in v0.3.0

func (t *TaskQ) Shutdown(ctx context.Context) error

func (*TaskQ) Start

func (t *TaskQ) Start() error

type WaitGroup added in v0.2.1

type WaitGroup struct {
	// contains filtered or unexported fields
}

func NewWaitGroup added in v0.2.1

func NewWaitGroup(size int) *WaitGroup

func (*WaitGroup) Enqueue added in v0.2.1

func (wg *WaitGroup) Enqueue(ctx context.Context, t Task) (int64, error)

func (*WaitGroup) Wait added in v0.2.1

func (wg *WaitGroup) Wait()

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL