taskq

package module
v0.0.0-...-73d3b3b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2024 License: MIT Imports: 6 Imported by: 0

README

A super lightweight Redis-backed task queue for Golang

Installation

go get github.com/yarcat/taskq

Usage

type MyTask struct {
	Phrase   string
	Duration time.Duration
}

// MyTaskHandler contains the business logic for handling MyTask.
// The following fields are given as an example. In real life, you may want to
// use a logger, a database connection, another task queue, etc.
type MyTaskHandler struct {
	Print func(string) error
	Sleep func(time.Duration)
}

func (h MyTaskHandler) Handle(ctx context.Context, task MyTask) error {
	h.Sleep(task.Duration)
	if err := h.Print(task.Phrase); err != nil {
		return fmt.Errorf("printing: %w", err)
	}
	return nil
}

func Example() {
	ctx := context.Background()

	client := redis.NewClient(&redis.Options{
		Addr: "172.21.169.52:6379",
		// This example creates "my" and "my-processing" keys in Redis.
		// Consider using a different redis.Options.DB to avoid conflicts.
		// DB: 12,
	})

	h := &MyTaskHandler{
		Print: func(s string) error {
			_, err := fmt.Println(s)
			return err
		},
		Sleep: time.Sleep,
	}

	var wg sync.WaitGroup
	wg.Add(1)

	queue := taskq.New(ctx, "my", client, h.Handle,
		taskq.WithProcessNext(stopAfterIterations(2)),
		taskq.WithNotifyStopped(wg.Done),
	)
	queue.Add(ctx, MyTask{"Sleep 1", 1 * time.Second})
	queue.Add(ctx, MyTask{"Sleep 2", 2 * time.Second})

	wg.Wait()
}

Documentation

Overview

Example
package main

import (
	"context"
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/redis/go-redis/v9"
	"github.com/yarcat/taskq"
)

type MyTask struct {
	Phrase   string
	Duration time.Duration
}

// MyTaskHandler contains the business logic for handling MyTask.
// The following fields are given as an example. In real life, you may want to
// use a logger, a database connection, another task queue, etc.
type MyTaskHandler struct {
	Print func(string) error
	Sleep func(time.Duration)
}

func (h MyTaskHandler) Handle(ctx context.Context, task MyTask) error {
	h.Sleep(task.Duration)
	if err := h.Print(task.Phrase); err != nil {
		return fmt.Errorf("printing: %w", err)
	}
	return nil
}

func stopAfterIterations(n int64) func() bool {
	return func() bool { return atomic.AddInt64(&n, -1) >= 0 }
}

func main() {
	ctx := context.Background()

	client := redis.NewClient(&redis.Options{
		Addr: "172.21.169.52:6379",
		// This example creates "my" and "my-processing" keys in Redis.
		// Consider using a different redis.Options.DB to avoid conflicts.
		// DB: 12,
	})

	h := &MyTaskHandler{
		Print: func(s string) error {
			_, err := fmt.Println(s)
			return err
		},
		Sleep: time.Sleep,
	}

	var wg sync.WaitGroup
	wg.Add(1)

	queue := taskq.New(ctx, "my", client, h.Handle,
		taskq.WithProcessNext(stopAfterIterations(2)),
		taskq.WithNotifyStopped(wg.Done),
	)
	queue.Add(ctx, MyTask{"Sleep 1", 1 * time.Second})
	queue.Add(ctx, MyTask{"Sleep 2", 2 * time.Second})

	wg.Wait()
}
Output:

Sleep 1
Sleep 2

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type OptionFunc

type OptionFunc func(*options)

OptionFunc provides an option setting function.

func WithNotifyStarted

func WithNotifyStarted(f func()) OptionFunc

WithNotifyStarted provides an option to set the function that is called when the task queue starts.

func WithNotifyStopped

func WithNotifyStopped(f func()) OptionFunc

WithNotifyStopped provides an option to set the function that is called when the task queue stops.

func WithProcessFailed

func WithProcessFailed(f func(string, error)) OptionFunc

WithProcessFailed provides an option to set the function that is called when a task fails to process.

func WithProcessNext

func WithProcessNext(f func() bool) OptionFunc

WithProcessNext provides an option to set the function that determines if the next task should be processed.

type TaskHandlerFunc

type TaskHandlerFunc[T any] func(context.Context, T) error

TaskHandlerFunc provides a function that handles a task.

type TaskQ

type TaskQ[T any] struct {
	// contains filtered or unexported fields
}

TaskQ provides a queue for tasks.

func New

func New[T any](ctx context.Context, key string, c *redis.Client, f TaskHandlerFunc[T], opts ...OptionFunc) *TaskQ[T]

New returns a new task queue.

func (*TaskQ[T]) Add

func (tq *TaskQ[T]) Add(ctx context.Context, x T) error

Add adds a task to the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL