executors

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2023 License: MIT Imports: 12 Imported by: 0

README

executors

A package like Java ThreadPoolExecutor for Go based on generic.

To bring better async task processing experience.

Go Reference Go Report Go Lint Go Test Go Coverage

https://github.com/zhenzou/executors/actions/workflows/test/badge.svg

Example

package main

import (
	"context"
	"time"

	"github.com/zhenzou/executors"
)

type Person struct {
	Name string
}

func main() {

	executor := executors.NewPoolExecutorService[Person](executors.WithMaxConcurrent(10))

	callable := executors.CallableFunc[Person](func(ctx context.Context) (Person, error) {
		time.Sleep(1 * time.Second)
		return Person{
			Name: "future",
		}, nil
	})

	f1, err := executor.Submit(callable)
	if err != nil {
		panic(err)
	}
	// get, block until async task completed
	got, err := f1.Get(context.Background())
	if err != nil {
		panic(err)
	}
	println(got.Name)

	f2, _ := executor.Submit(callable)
	// then, add callback when call succeed
	f2.Then(func(val Person) {
		println(val.Name)
	})

	f3, _ := executor.Submit(callable)
	// catch, add callback when call failed
	f3.Catch(func(err error) {
		println(err.Error())
	})
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRejectedExecution   = errors.New("rejected execution")
	ErrShutdown            = errors.New("shutdown")
	ErrInvalidCronExpr     = errors.New("invalid corn expr")
	ErrInvalidCronTimezone = errors.New("invalid corn timezone")
)
View Source
var (
	ErrFutureCanceled = errors.New("future canceled")
)

Functions

func WithErrorHandler

func WithErrorHandler(handler ErrorHandler) _PoolExecutorOption

func WithExecuteTimeout

func WithExecuteTimeout(ts time.Duration) _PoolExecutorOption

func WithLogger added in v0.0.6

func WithLogger(logger *slog.Logger) _PoolExecutorOption

func WithMaxBlockingTasks

func WithMaxBlockingTasks(max int) _PoolExecutorOption

func WithMaxConcurrent

func WithMaxConcurrent(concurrent int) _PoolExecutorOption

func WithRejectionHandler

func WithRejectionHandler(handler RejectionHandler) _PoolExecutorOption

Types

type CRONRule added in v0.0.6

type CRONRule struct {
	// Expr cron expr
	Expr string `json:"expr,omitempty"`

	// Timezone default UTC
	Timezone string `json:"timezone,omitempty"`
}

type Callable

type Callable[T any] interface {
	Call(ctx context.Context) (T, error)
}

type CallableFunc

type CallableFunc[T any] func(ctx context.Context) (T, error)

func (CallableFunc[T]) Call

func (c CallableFunc[T]) Call(ctx context.Context) (T, error)

type CallerRunsRejectionPolicy

type CallerRunsRejectionPolicy struct {
}

func (CallerRunsRejectionPolicy) RejectExecution

func (d CallerRunsRejectionPolicy) RejectExecution(runnable Runnable, e Executor) error

type CancelFunc added in v0.0.4

type CancelFunc = func()

type CatchFunction

type CatchFunction func(err error)

type DiscardErrorHandler

type DiscardErrorHandler struct {
}

func (DiscardErrorHandler) CatchError added in v0.1.0

func (d DiscardErrorHandler) CatchError(runnable Runnable, e error)

type DiscardRejectionPolicy

type DiscardRejectionPolicy struct {
}

func (DiscardRejectionPolicy) RejectExecution

func (d DiscardRejectionPolicy) RejectExecution(runnable Runnable, e Executor) error

type ErrInvalidState

type ErrInvalidState struct {
	// contains filtered or unexported fields
}

func (ErrInvalidState) Error

func (e ErrInvalidState) Error() string

type ErrPanic

type ErrPanic struct {
	Cause interface{}
}

func (ErrPanic) Error

func (e ErrPanic) Error() string

type ErrorHandler added in v0.1.0

type ErrorHandler interface {
	CatchError(runnable Runnable, e error)
}

type ErrorHandlerFunc

type ErrorHandlerFunc func(runnable Runnable, e error)

func (ErrorHandlerFunc) CatchError added in v0.1.0

func (f ErrorHandlerFunc) CatchError(runnable Runnable, e error)

type Executor

type Executor interface {
	// Execute execute a task in background
	// Will return ErrShutdown if shutdown already
	// Will return ErrRejectedExecution if task out of cap
	Execute(Runnable) error

	// Shutdown shutdown the executor
	// Will wait the queued task to be finish
	Shutdown(ctx context.Context) error
}

func NewPoolExecutor

func NewPoolExecutor(opts ..._PoolExecutorOption) Executor

type ExecutorService

type ExecutorService[T any] interface {
	Executor

	// Submit execute a task with result async, and can get the task result via get
	Submit(callable Callable[T]) (Future[T], error)
}

func NewPoolExecutorService

func NewPoolExecutorService[T any](opts ..._PoolExecutorOption) ExecutorService[T]

type Future

type Future[T any] interface {
	Get(ctx context.Context) (T, error)
	Then(thenFunc ThenFunction[T]) Future[T]
	Catch(catchFunc CatchFunction) Future[T]
	Cancel() bool
	Canceled() bool
	Completed() bool
	CompletedError() bool
}

type FutureTask

type FutureTask[T any] struct {
	// contains filtered or unexported fields
}

func NewFutureTask

func NewFutureTask[T any](callable Callable[T]) *FutureTask[T]

func (*FutureTask[T]) Cancel

func (f *FutureTask[T]) Cancel() bool

func (*FutureTask[T]) Canceled

func (f *FutureTask[T]) Canceled() bool

func (*FutureTask[T]) Catch

func (f *FutureTask[T]) Catch(catchFunc CatchFunction) Future[T]

func (*FutureTask[T]) Completed

func (f *FutureTask[T]) Completed() bool

func (*FutureTask[T]) CompletedError

func (f *FutureTask[T]) CompletedError() bool

func (*FutureTask[T]) Get

func (f *FutureTask[T]) Get(ctx context.Context) (T, error)

func (*FutureTask[T]) Run

func (f *FutureTask[T]) Run(ctx context.Context)

Run implement runnable

func (*FutureTask[T]) Then

func (f *FutureTask[T]) Then(thenFunc ThenFunction[T]) Future[T]

type LogErrorHandler added in v0.1.0

type LogErrorHandler struct {
}

func (LogErrorHandler) CatchError added in v0.1.0

func (d LogErrorHandler) CatchError(runnable Runnable, e error)

type NoopErrorHandler

type NoopErrorHandler struct {
}

func (NoopErrorHandler) CatchError added in v0.1.0

func (d NoopErrorHandler) CatchError(runnable Runnable, e error)

type NoopRejectionPolicy

type NoopRejectionPolicy struct {
}

func (NoopRejectionPolicy) RejectExecution

func (d NoopRejectionPolicy) RejectExecution(runnable Runnable, e Executor) error

type PoolExecutor

type PoolExecutor[T any] struct {
	// contains filtered or unexported fields
}

func (*PoolExecutor[T]) Execute

func (p *PoolExecutor[T]) Execute(r Runnable) error

func (*PoolExecutor[T]) Shutdown

func (p *PoolExecutor[T]) Shutdown(ctx context.Context) error

func (*PoolExecutor[T]) Submit

func (p *PoolExecutor[T]) Submit(callable Callable[T]) (Future[T], error)

type PoolScheduleExecutor added in v0.0.4

type PoolScheduleExecutor struct {
	*PoolExecutor[any]
	// contains filtered or unexported fields
}

func (*PoolScheduleExecutor) Schedule added in v0.0.4

func (p *PoolScheduleExecutor) Schedule(r Runnable, delay time.Duration) (CancelFunc, error)

func (*PoolScheduleExecutor) ScheduleAtCronRate added in v0.0.6

func (p *PoolScheduleExecutor) ScheduleAtCronRate(r Runnable, rule CRONRule) (CancelFunc, error)

func (*PoolScheduleExecutor) ScheduleAtFixRate added in v0.0.4

func (p *PoolScheduleExecutor) ScheduleAtFixRate(r Runnable, period time.Duration) (CancelFunc, error)

func (*PoolScheduleExecutor) Shutdown added in v0.0.4

func (p *PoolScheduleExecutor) Shutdown(ctx context.Context) error

type RejectionHandler

type RejectionHandler interface {
	RejectExecution(runnable Runnable, e Executor) error
}

type Runnable

type Runnable interface {
	Run(ctx context.Context)
}

type RunnableFunc

type RunnableFunc func(ctx context.Context)

func (RunnableFunc) Run

func (r RunnableFunc) Run(ctx context.Context)

type ScheduledExecutor added in v0.0.4

type ScheduledExecutor interface {
	Executor
	// Schedule run a one time task after delay duration
	Schedule(r Runnable, delay time.Duration) (CancelFunc, error)

	// ScheduleAtFixRate schedule a periodic task in fixed rate
	ScheduleAtFixRate(r Runnable, period time.Duration) (CancelFunc, error)

	// ScheduleAtCronRate schedule at periodic cron task
	ScheduleAtCronRate(r Runnable, rule CRONRule) (CancelFunc, error)
}

func NewPoolScheduleExecutor added in v0.0.4

func NewPoolScheduleExecutor(opts ..._PoolExecutorOption) ScheduledExecutor

type ThenFunction

type ThenFunction[T any] func(val T)

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL