worker

package
v0.0.0-...-6622c04 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2022 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package worker provides a way to manipulate concurrent processing. This guarantees all start/restart/stop operation for worker is always thread-safe by using Go channel feature.

Example
package main

import (
	"github.com/iwataka/mybot/backend/worker"

	"context"
	"fmt"
)

type MyWorker struct {
	name string
}

func NewMyWorker(name string) *MyWorker {
	return &MyWorker{name}
}

func (w *MyWorker) Start(ctx context.Context, outChan chan<- interface{}) error {
	<-ctx.Done()
	return nil
}

func (w *MyWorker) Name() string {
	return w.name
}

func main() {
	w := NewMyWorker("foo")
	wm := worker.NewWorkerManager(w, 0)
	defer wm.Close()

	// Start worker
	wm.Send(worker.StartSignal)
	fmt.Printf("Worker Status: %s\n", wm.Receive())

	// Stop worker
	wm.Send(worker.StopSignal)
	fmt.Printf("Worker Status: %s\n", wm.Receive())

}
Output:

Worker Status: Started
Worker Status: Stopped

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type StrategicRestarter

type StrategicRestarter struct {
	// contains filtered or unexported fields
}

StrategicRestarter is a channel layer applied to restart a worker automatically. This restarts a worker when some error happens. If error happens more than `count` times in `interval` duration, this stops.

func NewStrategicRestarter

func NewStrategicRestarter(interval time.Duration, count int, suppressError bool) StrategicRestarter

NewStrategicRestarter creates a new StrategicRestarter.

func (StrategicRestarter) Apply

func (sr StrategicRestarter) Apply(ctx context.Context, inChan chan<- WorkerSignal, outChan <-chan interface{}, bufSize int, wg *sync.WaitGroup) (chan<- WorkerSignal, <-chan interface{})

type WorkerChannelLayer

type WorkerChannelLayer interface {
	// Apply applies this layer to inChan and outChan asynchronously.
	Apply(ctx context.Context, inChan chan<- WorkerSignal, outChan <-chan interface{}, bufSize int, wg *sync.WaitGroup) (chan<- WorkerSignal, <-chan interface{})
}

WorkerChannelLayer represents a layer for worker channels to catch inChan/outChan outputs, apply some filter or conversion and then rethrow them.

type WorkerManager

type WorkerManager struct {
	// contains filtered or unexported fields
}

func NewWorkerManager

func NewWorkerManager(worker models.Worker, bufSize int, layers ...WorkerChannelLayer) *WorkerManager

func (*WorkerManager) Close

func (wm *WorkerManager) Close()

func (*WorkerManager) HandleOutput

func (wm *WorkerManager) HandleOutput(h WorkerManagerOutHandler)

func (*WorkerManager) Receive

func (wm *WorkerManager) Receive() interface{}

func (*WorkerManager) Send

func (wm *WorkerManager) Send(s WorkerSignal)

func (*WorkerManager) Status

func (wm *WorkerManager) Status() WorkerStatus

type WorkerManagerOutHandler

type WorkerManagerOutHandler interface {
	Handle(out interface{})
}

type WorkerSignal

type WorkerSignal int

WorkerSignal is a signal sent to Worker. Worker should behave as per the content of it and respond.

const (
	StartSignal WorkerSignal = iota
	StopSignal
	RestartSignal
)

These constants indicate signal type sent to worker

func (WorkerSignal) String

func (s WorkerSignal) String() string

String returns a text indicating a type of this WorkerSignal.

Example
package main

import (
	"github.com/iwataka/mybot/backend/worker"

	"fmt"
)

func main() {
	fmt.Println(worker.StartSignal)
	fmt.Println(worker.RestartSignal)
	fmt.Println(worker.StopSignal)
	fmt.Println(worker.WorkerSignal(-1))
}
Output:

Start
Restart
Stop

type WorkerStatus

type WorkerStatus int

WorkerStatus is a type indicating Worker status

const (
	StatusStarted WorkerStatus = iota
	StatusStopped
	// StatusFinished means worker was finished successfully.
	StatusFinished
)

These constants indicate status type of Worker

func (WorkerStatus) String

func (s WorkerStatus) String() string

String returns a text to indicating a type of this WorkerStatus.

Example
package main

import (
	"github.com/iwataka/mybot/backend/worker"

	"fmt"
)

func main() {
	fmt.Println(worker.StatusFinished)
	fmt.Println(worker.StatusStarted)
	fmt.Println(worker.StatusStopped)
	fmt.Println(worker.WorkerStatus(-1))
}
Output:

Finished
Started
Stopped

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL