workers

package module
v1.8.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2023 License: Apache-2.0 Imports: 7 Imported by: 7

README

WORKERS

workers pool for Golang

Install

go get -u github.com/aacfactory/workers

Usage

Create worker task

type Task struct {}

func (task *Task) Execute(ctx context.Context) {
    // todo: handle task
}

New workers and execute command

ws := workers.New()

// dispatch task to worker
ctx := context.TODO()
if ok := ws.Dispatch(ctx, &Task{}); !ok {
    // todo: handle 'no workers remain' or 'closed' or 'context timeout'
}

// must dispatch task to worker 
ws.MustDispatch(ctx, &Task{})

ws.Close()

LongTask for no timeout reader

type LongTask struct {
	*workers.AbstractLongTask
	reader io.Reader
}

func (task *LongTask) Execute(ctx context.Context) {
	for {
        if aborted, cause := task.Aborted(); aborted {
            // handle timeout
            break
        }
		p := make([]byte, 0, 10)
		n, readErr := task.reader.Read(p)
		if readErr != nil {
            task.Close()
		}
		task.Touch(3 * time.Second)
		// handle content
	}
}
task := &LongTask{
    AbstractLongTask: workers.NewAbstractLongTask(3 * time.Second),
}

Benchmark

Handler used in benchmark is sleeping 50 millisecond as handling command.

goos: windows
goarch: amd64
pkg: github.com/aacfactory/workers
cpu: AMD Ryzen 9 3950X 16-Core Processor
BenchmarkNewWorkers
    worker_benchmark_test.go:37: total 1 accepted 1
    worker_benchmark_test.go:37: total 100 accepted 100
    worker_benchmark_test.go:37: total 10000 accepted 10000
    worker_benchmark_test.go:37: total 266420 accepted 266420
    worker_benchmark_test.go:37: total 834958 accepted 834958
    worker_benchmark_test.go:37: total 1104766 accepted 1104766
BenchmarkNewWorkers-32           1104766               953.7 ns/op            25
 B/op          1 allocs/op
PASS

Thanks

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoTimeoutInLongTask   = errors.New("no timeout")
	ErrNoHeartbeatInLongTask = errors.New("no heartbeat")
	ErrLongTaskTimeout       = errors.New("timeout")
	ErrLongTaskNormalClosed  = errors.New("normal closed")
)
View Source
var (
	TimeoutErr = fmt.Errorf("workers execute timeout")
)

Functions

This section is empty.

Types

type AbstractLongTask added in v1.8.0

type AbstractLongTask struct {
	// contains filtered or unexported fields
}

func NewAbstractLongTask added in v1.8.0

func NewAbstractLongTask(initialTimeout time.Duration) *AbstractLongTask

func (*AbstractLongTask) Aborted added in v1.8.0

func (task *AbstractLongTask) Aborted() (ok bool, cause error)

func (*AbstractLongTask) Close added in v1.8.0

func (task *AbstractLongTask) Close()

func (*AbstractLongTask) Heartbeat added in v1.8.0

func (task *AbstractLongTask) Heartbeat() (initialTimeout time.Duration, ch <-chan time.Duration)

func (*AbstractLongTask) OnAbort added in v1.8.0

func (task *AbstractLongTask) OnAbort(cause error)

func (*AbstractLongTask) Touch added in v1.8.0

func (task *AbstractLongTask) Touch(nextAliveTimeout time.Duration)

type Group added in v1.6.0

type Group interface {
	Add(key string, task GroupTask)
	Run(ctx context.Context) (future GroupFuture)
}

type GroupFuture added in v1.6.0

type GroupFuture interface {
	Wait(ctx context.Context) (result GroupResult, err error)
}

type GroupResult added in v1.6.0

type GroupResult interface {
	Succeed() (ok bool)
	Results() (results map[string]interface{})
	Errors() (err map[string]error)
}

type GroupTask added in v1.6.0

type GroupTask interface {
	Execute(ctx context.Context) (result interface{}, err error)
}

type LongTask added in v1.8.0

type LongTask interface {
	Task
	Heartbeat() (initialTimeout time.Duration, ch <-chan time.Duration)
	OnAbort(cause error)
}

type NamedTask added in v1.8.2

type NamedTask interface {
	Name() (name string)
	Task
}

type Option

type Option func(*Options) error

func MaxIdleWorkerDuration added in v1.4.0

func MaxIdleWorkerDuration(d time.Duration) Option

func MaxWorkers added in v1.4.0

func MaxWorkers(max int) Option

type Options added in v1.1.0

type Options struct {
	MaxWorkers            int
	MaxIdleWorkerDuration time.Duration
}

type Task added in v1.4.0

type Task interface {
	Execute(ctx context.Context)
}

type Workers

type Workers interface {
	Dispatch(ctx context.Context, task Task) (ok bool)
	MustDispatch(ctx context.Context, task Task)
	Group() (group Group)
	Close()
}

func New added in v1.2.0

func New(options ...Option) (w Workers)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL