uwe

package module
v2.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2021 License: Apache-2.0 Imports: 17 Imported by: 6

README

GoDoc Go Report Card

uwe

UWE (Ubiquitous Workers Engine) is a common toolset for building and organizing your Go application, actor-like workers.

Table of Content

  1. Quick Start
  2. Documentation 2. Chief 3. Worker 4. Presets

Quick Start

Get uwe using go get:

go get github.com/lancer-kit/uwe/v2

Here is an example HelloWorld service with HTTP API and background worker:

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"

	"github.com/lancer-kit/uwe/v2"
	"github.com/lancer-kit/uwe/v2/presets/api"
)


func main()  {
	// fill configurations for the predefined worker that start an HTTP server
	apiCfg := api.Config{
		Host:              "0.0.0.0",
		Port:              8080,
		EnableCORS:        false,
		ApiRequestTimeout: 0,
	}

	// initialize new instance of Chief
	chief := uwe.NewChief()
	// will add workers into the pool
	chief.AddWorker("app-server", api.NewServer(apiCfg, getRouter()))
	chief.AddWorker("dummy", NewDummy())

	// will enable recover of internal panics
	chief.UseDefaultRecover()
	// pass handler for internal events like errors, panics, warning, etc.
	// you can log it with you favorite logger (ex Logrus, Zap, etc)
	chief.SetEventHandler(uwe.STDLogEventHandler())

	// init all registered workers and run it all
	chief.Run()
}

type dummy struct{}

// NewDummy initialize new instance of dummy Worker.
func NewDummy() uwe.Worker {
	// At this point in most cases there we are preparing some state of the worker,
	// like a logger, configuration, variable, and fields.
	 return &dummy{}
}

// Init is an interface method used to initialize some state of the worker
// that required interaction with outer context, for example, initialize some connectors.
func (d *dummy) Init() error { return nil }

// Run starts event loop of worker.
func (d *dummy) Run(ctx uwe.Context) error {
	// initialize all required stuffs for the execution flow 
	ticker := time.NewTicker(time.Second)

	for {
		select {
		case <-ticker.C:
			// define all the processing code here 
			// or move it to a method and make a call here
			log.Println("do something")
		case <-ctx.Done():
			// close all connections, channels and finalise state if needed
			log.Println("good bye")
			return nil
		}
	}
}

// getRouter is used to declare an API scheme, 
func getRouter() http.Handler {
	// instead default can be used any another compatible router
	mux := http.NewServeMux()
	mux.HandleFunc("/hello/uwe", func(w http.ResponseWriter, r *http.Request) {
		_, _ = fmt.Fprintln(w, "hello world")
	})

	log.Println("REST API router initialized")
	return mux
}

Documentation

Chief

Chief is a supervisor that can be placed at the top of the go application's execution stack, it is blocked until SIGTERM is intercepted and then it shutdown all workers gracefully. Also, Chief can be used as a child supervisor inside the Worker, which is launched by Chief at the top-level.

Worker

Worker is an interface for async workers which launches and manages by the Chief.

  1. Init() - method used to initialize some state of the worker that required interaction with outer context, for example, initialize some connectors. In many cases this method is optional, so it can be implemented as empty: func (*W) Init() error { return nil }.
  2. Run(ctx Context) error - starts the Worker instance execution. The context will provide a signal when a worker must stop through the ctx.Done().

Workers lifecycle:

 (*) -> [New] -> [Initialized] -> [Run] -> [Stopped]
          |             |           |
          |             |           ↓
          |-------------|------> [Failed]
Presets

This library provides some working presets to simplify the use of Chief in projects and reduce duplicate code.

HTTP Server

api.Server is worker by default for starting a standard HTTP server. Server requires configuration and initialized http.Handler.

The HTTP server will work properly and will be correctly disconnected upon a signal from Supervisor (Chief).

Warning: this Server does not process SSL/TLS certificates on its own.To start an HTTPS server, look for a specific worker.

package main

import (
	"fmt"
	"log"
	"net/http"

	"github.com/lancer-kit/uwe/v2"
)

func main() {
	// fill configurations for the predefined worker that start an HTTP server
	apiCfg := Config{
		Host:              "0.0.0.0",
		Port:              8080,
		EnableCORS:        false,
		ApiRequestTimeout: 0,
	}

	// initialize new instance of Chief
	chief := uwe.NewChief()
	chief.UseDefaultRecover()
	chief.SetEventHandler(uwe.STDLogEventHandler())

	// instead default can be used any another compatible router
	mux := http.NewServeMux()
	mux.HandleFunc("/hello/uwe", func(w http.ResponseWriter, r *http.Request) {
		_, _ = fmt.Fprintln(w, "hello world")
	})

	chief.AddWorker("app-server", NewServer(apiCfg, mux))

	chief.Run()
}
Job

presets.Job is a primitive worker who performs an action callback with a given period.

package main

import (
	"log"
	"time"

	"github.com/lancer-kit/uwe/v2"
	"github.com/lancer-kit/uwe/v2/presets"
)

func main() {
	var action = func() error {
		// define all the processing code here
		// or move it to a method and make a call here
		log.Println("do something")
		return nil
	}

	// initialize new instance of Chief
	chief := uwe.NewChief()
	chief.UseDefaultRecover()
	chief.SetEventHandler(uwe.STDLogEventHandler())

	// will add workers into the pool
	chief.AddWorker("simple-job", presets.NewJob(time.Second, action))

	chief.Run()
}
WorkerFunc

presets.WorkerFunc is a type of worker that consist from one function. Allow to use the function as worker.

package presets

import (
	"log"
	"time"

	"github.com/lancer-kit/uwe/v2"
	"github.com/lancer-kit/uwe/v2/presets"
)

func main() {
	var anonFuncWorker = func(ctx uwe.Context) error {
		// initialize all required stuffs for the execution flow
		ticker := time.NewTicker(time.Second)
		for {
			select {
			case <-ticker.C:
				// define all the processing code here
				// or move it to a method and make a call here
				log.Println("do something")
			case <-ctx.Done():
				// close all connections, channels and finalise state if needed
				log.Println("good bye")
				return nil
			}
		}
	}

	// initialize new instance of Chief
	chief := uwe.NewChief()
	chief.UseDefaultRecover()
	chief.SetEventHandler(uwe.STDLogEventHandler())

	// will add workers into the pool
	chief.AddWorker("anon-func", WorkerFunc(anonFuncWorker))

	chief.Run()
}

License

This library is distributed under the Apache 2.0 license.

Documentation

Overview

uwe (Ubiquitous Workers Engine) is a common toolset for building and organizing Go application with actor-like workers.

`Chief` is a supervisor that can be placed at the top of the go application's execution stack, it is blocked until SIGTERM is intercepted and then it shutdown all workers gracefully. Also, `Chief` can be used as a child supervisor inside the` Worker`, which is launched by `Chief` at the top-level.

`Worker` is an interface for async workers which launches and manages by the **Chief**.

1. `Init()` - method used to initialize some state of the worker that required interaction with outer context, for example, initialize some connectors. In many cases this method is optional, so it can be implemented as empty:

`func (*W) Init() error { return nil }`.

2. `Run(ctx Context) error` - starts the `Worker` instance execution. The context will provide a signal when a worker must stop through the `ctx.Done()`.

Workers lifecycle:

```text (*) -> [New] -> [Initialized] -> [Run] -> [Stopped]

|             |           |
|             |           ↓
|-------------|------> [Failed]

```

Example
package main

import (
	"log"
	"time"
)

func main() {
	// initialize new instance of Chief
	chief := NewChief()
	// will add workers into the pool
	chief.AddWorker("dummy", NewDummy())

	// will enable recover of internal panics
	chief.UseDefaultRecover()
	// pass handler for internal events like errors, panics, warning, etc.
	// you can log it with you favorite logger (ex Logrus, Zap, etc)
	chief.SetEventHandler(STDLogEventHandler())

	// init all registered workers and run it all
	chief.Run()
}

type dummy struct{}

// NewDummy initialize new instance of dummy Worker.
func NewDummy() Worker {
	// At this point in most cases there we are preparing some state of the worker,
	// like a logger, configuration, variable, and fields.
	return &dummy{}
}

// Init is an interface method used to initialize some state of the worker
// that required interaction with outer context, for example, initialize some connectors.
func (d *dummy) Init() error { return nil }

// Run starts event loop of worker.
func (d *dummy) Run(ctx Context) error {
	// initialize all required stuffs for the execution flow
	ticker := time.NewTicker(time.Second)

	for {
		select {
		case <-ticker.C:
			// define all the processing code here
			// or move it to a method and make a call here
			log.Println("do something")
		case <-ctx.Done():
			// close all connections, channels and finalise state if needed
			log.Println("good bye")
			return nil
		}
	}
}
Output:

Index

Examples

Constants

View Source
const (
	// StatusAction is a command useful for health-checks, because it returns status of all workers.
	StatusAction = "status"
	// PingAction is a simple command that returns the "pong" message.
	PingAction = "ping"
)
View Source
const (
	WStateNotExists   sam.State = "NotExists"
	WStateNew         sam.State = "New"
	WStateInitialized sam.State = "Initialized"
	WStateRun         sam.State = "Run"
	WStateStopped     sam.State = "Stopped"
	WStateFailed      sam.State = "Failed"
)
View Source
const DefaultForceStopTimeout = 45 * time.Second

DefaultForceStopTimeout is a timeout for killing all workers.

Variables

This section is empty.

Functions

func CliCheckCommand added in v2.1.0

func CliCheckCommand(app AppInfo, workerListProvider func(c *cli.Context) []WorkerName) cli.Command

CliCheckCommand returns `cli.Command`, which allows you to check the health of a running instance **Application** with `ServiceSocket` enabled using `(Chief) .EnableServiceSocket(...)`

func STDLogEventHandler added in v2.2.0

func STDLogEventHandler() func(event Event)

STDLogEventHandler returns a callback that handles internal `Chief` events and logs its.

Types

type AppInfo added in v2.1.0

type AppInfo struct {
	Name    string `json:"name"`
	Version string `json:"version"`
	Build   string `json:"build"`
	Tag     string `json:"tag"`
}

AppInfo is a details of the *Application* build.

func (AppInfo) SocketName added in v2.1.0

func (app AppInfo) SocketName() string

SocketName returns name of *Chief Service Socket*.

type Chief

type Chief interface {
	// AddWorker registers the worker in the pool.
	AddWorker(WorkerName, Worker)
	// AddWorkers registers the list of workers in the pool.
	AddWorkers(map[WorkerName]Worker)
	// GetWorkersStates returns the current state of all registered workers.
	GetWorkersStates() map[WorkerName]sam.State
	// EnableServiceSocket initializes `net.Socket` server for internal management purposes.
	// By default includes two actions:
	// 	- "status" is a command useful for health-checks, because it returns status of all workers;
	// 	- "ping" is a simple command that returns the "pong" message.
	// The user can provide his own list of actions with handler closures.
	EnableServiceSocket(app AppInfo, actions ...socket.Action)
	// Event returns the channel with internal Events.
	// ATTENTION: `Event() <-chan Event` and `SetEventHandler(EventHandler)` is mutually exclusive,
	// but one of them must be used!
	Event() <-chan Event
	// SetEventHandler adds a callback that processes the `Chief`
	// internal events and can log them or do something else.
	// ATTENTION: `Event() <-chan Event` and `SetEventHandler(EventHandler)` is mutually exclusive,
	// but one of them must be used!
	SetEventHandler(EventHandler)
	// SetContext replaces the default context with the provided one.
	// It can be used to deliver some values inside `(Worker) .Run (ctx Context)`.
	SetContext(context.Context)
	// SetLocker sets a custom `Locker`, if it is not set,
	// the default `Locker` will be used, which expects SIGTERM or SIGINT system signals.
	SetLocker(Locker)
	// SetRecover sets a custom `recover` that catches panic.
	SetRecover(Recover)
	// SetShutdown sets `Shutdown` callback.
	SetShutdown(Shutdown)
	// UseDefaultRecover sets a standard handler as a `recover`
	// that catches panic and sends a fatal event to the event channel.
	UseDefaultRecover()
	// Run is the main entry point into the `Chief` run loop.
	// This method initializes all added workers, the server `net.Socket`,
	// if enabled, starts the workers in separate routines
	// and waits for the end of lock produced by the locker function.
	Run()
	// Shutdown sends stop signal to all child goroutines by triggering of the `context.CancelFunc()`
	// and executes `Shutdown` callback.
	Shutdown()
}

Chief is a supervisor that can be placed at the top of the go application's execution stack, it is blocked until SIGTERM is intercepted and then it shutdown all workers gracefully. Also, `Chief` can be used as a child supervisor inside the `Worker`, which is launched by `Chief` at the top-level.

func NewChief

func NewChief() Chief

NewChief returns new instance of standard `Chief` implementation.

type Context

type Context interface {
	context.Context
}

Context is a wrapper over the standard `context.Context`. The main purpose of this is to extend in the future.

func NewContext

func NewContext() Context

NewContext returns new context.

type Event

type Event struct {
	Level   EventLevel
	Worker  WorkerName
	Fields  map[string]interface{}
	Message string
}

Event is a message object that is used to signalize about Chief's internal events and processed by `EventHandlers`.

func ErrorEvent

func ErrorEvent(msg string) Event

ErrorEvent returns new Event with `LvlError` and provided message.

func (Event) IsError

func (e Event) IsError() bool

IsError returns `true` if event level is `Error`

func (Event) IsFatal

func (e Event) IsFatal() bool

IsFatal returns `true` if event level is `Fatal`

func (Event) SetField

func (e Event) SetField(key string, value interface{}) Event

SetField add to event some Key/Value.

func (Event) SetWorker

func (e Event) SetWorker(name WorkerName) Event

SetWorker sets the provided `worker` as the event source.

func (Event) ToError

func (e Event) ToError() error

ToError validates event level and cast to builtin `error`.

type EventHandler

type EventHandler func(Event)

EventHandler callback that processes the `Chief` internal events, can log them or do something else.

func LogrusEventHandler added in v2.2.0

func LogrusEventHandler(entry *logrus.Entry) EventHandler

LogrusEventHandler returns default `EventHandler` that can be used for `Chief.SetEventHandler(...)`.

type EventLevel

type EventLevel string

EventLevel ...

const (
	LvlFatal EventLevel = "fatal"
	LvlError EventLevel = "error"
	LvlInfo  EventLevel = "info"
)

type Locker

type Locker func()

Locker is a function whose completion of a call is a signal to stop `Chief` and all workers.

type Recover

type Recover func(name WorkerName)

Recover is a function that will be used as a `defer call` to handle each worker's panic.

type Shutdown

type Shutdown func()

// Shutdown is a callback function that will be executed after the Chief and workers are stopped. Its main purpose is to close, complete, or retain some global states or shared resources.

type StateInfo added in v2.1.0

type StateInfo struct {
	App     AppInfo                  `json:"app"`
	Workers map[WorkerName]sam.State `json:"workers"`
}

StateInfo is result the `StatusAction` command.

func ParseStateInfo added in v2.1.0

func ParseStateInfo(data json.RawMessage) (*StateInfo, error)

ParseStateInfo decodes `StateInfo` from the JSON response for the `StatusAction` command.

type Worker

type Worker interface {
	// Init initializes some state of the worker that required interaction with outer context,
	// for example, initialize some connectors. In many cases this method is optional,
	// so it can be implemented as empty: `func (*W) Init() error { return nil }`.
	Init() error

	// Run  starts the `Worker` instance execution. The context will provide a signal
	// when a worker must stop through the `ctx.Done()`.
	Run(ctx Context) error
}

Worker is an interface for async workers which launches and manages by the `Chief`.

type WorkerExistRule

type WorkerExistRule struct {
	AvailableWorkers map[WorkerName]struct{}
	// contains filtered or unexported fields
}

WorkerExistRule is a custom validation rule for the `ozzo-validation` package.

func (*WorkerExistRule) Error

func (r *WorkerExistRule) Error(message string) *WorkerExistRule

Error sets the error message for the rule.

func (*WorkerExistRule) Validate

func (r *WorkerExistRule) Validate(value interface{}) error

Validate checks that service exist on the system

type WorkerName

type WorkerName string

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool provides a mechanism to combine many workers into the one pool, manage them, and run.

func (*WorkerPool) FailWorker

func (p *WorkerPool) FailWorker(name WorkerName) error

FailWorker sets state `WorkerFailed` for workers with the specified `name`.

func (*WorkerPool) GetState

func (p *WorkerPool) GetState(name WorkerName) sam.State

GetState returns current state for workers with the specified `name`.

func (*WorkerPool) GetWorkersStates

func (p *WorkerPool) GetWorkersStates() map[WorkerName]sam.State

GetWorkersStates returns current state of all workers.

func (*WorkerPool) InitWorker

func (p *WorkerPool) InitWorker(name WorkerName) error

InitWorker initializes all present workers.

func (*WorkerPool) IsRun

func (p *WorkerPool) IsRun(name WorkerName) bool

IsRun checks is active worker with passed `name`.

func (*WorkerPool) ReplaceWorker

func (p *WorkerPool) ReplaceWorker(name WorkerName, worker Worker)

ReplaceWorker replaces the worker with `name` by new `worker`.

func (*WorkerPool) RunWorkerExec

func (p *WorkerPool) RunWorkerExec(ctx Context, name WorkerName) (err error)

RunWorkerExec adds worker into pool.

func (*WorkerPool) SetState

func (p *WorkerPool) SetState(name WorkerName, state sam.State) error

SetState updates state of specified worker.

func (*WorkerPool) SetWorker

func (p *WorkerPool) SetWorker(name WorkerName, worker Worker) error

SetWorker adds worker into pool.

func (*WorkerPool) StartWorker

func (p *WorkerPool) StartWorker(name WorkerName) error

StartWorker sets state `WorkerEnabled` for workers with the specified `name`.

func (*WorkerPool) StopWorker

func (p *WorkerPool) StopWorker(name WorkerName) error

StopWorker sets state `WorkerStopped` for workers with the specified `name`.

Directories

Path Synopsis
examples
api
imq

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL