chord

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2023 License: MIT Imports: 9 Imported by: 0

README

Chord — a task managment toolkit for Go

GoDoc report card

This package provides a collection of tools for building your own internal task management. The needs of different projects differ, so an opinionated one-stop-solution will necessarily have limited use. This package aims to provide the building blocks that tend to be common across such systems.

For an example of this package in use, see: https://github.com/neondatabase/autoscaling/blob/sharnoff/chord-task-mgmt/pkg/task/manager.go.

Feature summary

Broadly there's three categories of things provided by this package:

  • Stack trace chaining — for including callers in the stack traces of goroutines
  • Signal handling — both OS signals and user-defined signals, like "shutdown"
  • Hierarchical, named sync.WaitGroup — for tracking tasks: getting the active set and waiting on a subset to finish

For full API documentation, refer to godoc: https://godoc.org/github.com/sharnoff/chord.

Documentation

Overview

Package chord provides a collection of tools for task management, with a focus on minimizing magic.

Broadly, the tools belong to a few distinct groups:

- Signal handling: SignalManager and SignalRegister - Stack trace collection, printing, and appending: StackTrace, GetStackTrace, and StackFrame - Hierarchical, named sync.WaitGroup: TaskGroup, NewTaskGroup, TaskTree, TaskInfo

For an example of using all of it together, see: <https://github.com/neondatabase/autoscaling/blob/sharnoff/chord-task-mgmt/pkg/task/manager.go>.

Signal handling

The general idea behind signal handling via SignalManager is that signals are mostly user-defined, trigger exactly once, and are hierarchical (i.e. triggering a signal in a child SignalManager does not affect the parent).

Users of SignalManager register callbacks via SignalManager.On. Callbacks are called at most once, when the signal is triggered. Callbacks are processed sequentially in the reverse order of when they're registered, and may return error. Unhandled errors will prevent calling any remaining callbacks.

Signals can be manually triggered with SignalManager.TriggerAndWait. There's special handling around os.Signal values so that forwarding into the SignalManager is automatically set up when required.

For more, see SignalManager.

Stack traces

The primary goal of the stack trace tooling here is to make it easy to link stack traces across goroutines. To that end, GetStackTrace may be given a parent StackTrace to use, which gets appended on producing a string.

The stack trace management is designed with simplicity in mind, with optimizations for collection but not printing (i.e. GetStackTrace should be fast, but there's faster ways to print than via StackTrace.String).

For more, see StackTrace.

Hierarchical WaitGroup

TaskGroup is essentially a glorified sync.WaitGroup. The main features it adds are: subgroups, named Add/Done, and fetching all tasks not yet Done. It also exposes TaskGroup.Wait as a channel, so you can select over it. TaskGroup.TryWait takes a context.Context and does that selection for you.

For more, see TaskGroup.

Example

Start an HTTP "hello world" server that exits on SIGTERM or SIGINT, or after 2 seconds.

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"syscall"
	"time"

	"github.com/sharnoff/chord"
)

// Shutdown will be used as the signal to trigger shutdown hooks
var Shutdown shutdown

type shutdown struct{}

// returns a context that is canceled after 1 second
func makeShutdownContext() context.Context {
	// it's ok to leak the context here; this is just for shutdown.
	ctx, _ := context.WithTimeout(context.TODO(), time.Second)
	return ctx
}

// Start an HTTP "hello world" server that exits on SIGTERM or SIGINT, or after 2 seconds.
func main() {
	mgr := chord.NewSignalManager()
	defer mgr.Stop()
	// Once done, wait for shutdown to complete
	defer func() {
		if err := mgr.TryWait(Shutdown, makeShutdownContext()); err != nil {
			log.Fatalf("timed out waiting for shutdown: %s", err)
		}
	}()

	// Forward OS signals to our custom Shutdown signal
	_ = mgr.On(syscall.SIGTERM, context.TODO(), func(ctx context.Context) error {
		return mgr.TriggerAndWait(Shutdown, ctx)
	})
	_ = mgr.On(syscall.SIGINT, context.TODO(), func(ctx context.Context) error {
		return mgr.TriggerAndWait(Shutdown, ctx)
	})
	// Log that shutdown completed successfully, once done
	_ = mgr.On(Shutdown, context.TODO(), func(context.Context) error {
		fmt.Println("Shutdown complete!")
		return nil
	})

	server := &http.Server{
		Addr: ":8080",
		Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			_, _ = w.Write([]byte("hello, world!"))
		}),
	}

	// Register the shutdown callback
	withHandler := mgr.WithErrorHandler(func(c context.Context, err error) error {
		if err != nil {
			log.Fatal(err)
		}
		return nil
	})
	// error will be handled; we don't need to worry about it
	_ = withHandler.On(Shutdown, context.TODO(), func(ctx context.Context) error {
		err := server.Shutdown(ctx)
		return err
	})

	// timeout after 2 seconds
	go func() {
		time.Sleep(2 * time.Second)
		// safe to ignore error because of the error handling above. Otherwise, that error could
		// appear here.
		_ = mgr.TriggerAndWait(Shutdown, makeShutdownContext())
	}()

	fmt.Println("Starting server")
	if err := server.ListenAndServe(); err != http.ErrServerClosed {
		log.Fatal(err)
	}
}
Output:

Starting server
Shutdown complete!

Index

Examples

Constants

This section is empty.

Variables

View Source
var SignalManagerAlreadyStopped = errors.New("SignalManager already stopped")

SignalManagerAlreadyStopped is the payload for a variety of panics that may occur when calling a SignalManager's methods after it's already stopped.

See SignalManager.Stop for more info.

Functions

This section is empty.

Types

type SignalManager

type SignalManager struct {
	// contains filtered or unexported fields
}

SignalManager provides generic handling for program lifetime signals, where signals are generally expected to occur exactly once, and may be user-defined or builtin (like os.Signal).

The basic interface is that callbacks are registered with SignalManager.On, eventually called when SignalManager.TriggerAndWait is called with that signal.

SignalManager is hierarchical: SignalManager.NewChild returns a new SignalManager that inherits all signals from its parent, but can be independently triggered without affecting the parent.

All SignalManager methods are safe for multi-threaded use.

Refer to the examples for some sample use cases.

func NewSignalManager

func NewSignalManager() *SignalManager

NewSignalManager returns a new SignalManager

Callers MUST make sure to call SignalManager.Stop when done, or else the SignalManager may leak goroutines.

func (*SignalManager) Context

func (m *SignalManager) Context(signal any) context.Context

Context returns a Context that is canceled once the signal is triggered.

Note that the semantics of this method are different from SignalManager.Wait; the channel returned by Wait is closed after all callbacks have finished, whereas the channel returned by Context().Done() is closed after the signal is initially triggered.

func (*SignalManager) Ignore

func (m *SignalManager) Ignore(signal any)

Ignore prevents a signal triggering in the SignalManager's parent from affecting it directly. If further calls SignalManager.TriggerAndWait are made in its parent (or parent's parent, etc), then it will not flow down through to this one.

If the signal has been previously triggered in an ancestor SignalManager and *observed* in this SignalManager or any descendent, Ignore will have no effect.

A triggered signal is considered "observed" if there has been any call to SignalManager.Context, SignalManager.Wait, or SignalManager.On (with ≥1 callback).

func (*SignalManager) NewChild

func (m *SignalManager) NewChild() *SignalManager

NewChild creates a new SignalManager as a child of m. By default, it will inherit all signals that are not later ignored (via SignalManager.Ignore). Calling SignalManager.TriggerAndWait on the child will not affect the parent.

Callers MUST make sure to call SignalManager.Stop when done with the child, or else the SignalManager may leak goroutines.

NewChild will panic with SignalManagerAlreadyStopped if called on a SignalManager that has already stopped (see SignalManager.Stop for more info).

func (*SignalManager) On

func (m *SignalManager) On(signal any, immediateCtx context.Context, callbacks ...func(context.Context) error) error

On registers the callbacks to be called when the signal is triggered.

If the signal has already been triggered (either via a call to SignalManager.TriggerAndWait on this SignalManager, or one of its ancestors), then the callbacks will be immediately called, passing immediateCtx and returning any unhandled error.

Ordinarily, any unhandled error from a callback will abort executing all other callbacks. A function may be provided to handle (and possibly silence) these errors, via SignalManager.WithErrorHandler, which returns an object with the same definition of On.

On will panic with SignalManagerAlreadyStopped called on a SignalManager that has already stopped (see SignalManager.Stop for more info).

All registered callbacks for a signal are called in reverse order - both within and across calls to On.

A child SignalManager's callbacks are all called at the same time, as if they were all added at the same time, even if adding them was actually interleaved. For example:

func orderingExample() {
	mgr := chord.NewSignalManager()
	defer mgr.Close()

	_ = mgr.On("sig", context.TODO(), func(context.Context) error {
		fmt.Print("A")
		return nil
	})

	child :- mgr.NewChild()
	defer child.Close()

	_ = child.On("sig", context.TODO() func(context.Context) error {
		fmt.Print("B")
		return nil
	})

	_ = mgr.On("sig", context.TODO(), func(context.Context) error {
		fmt.Print("C")
		return nil
	})

	// Even though this call to On is "after" the previous, the callback is handled later
	// because the original child was created before.
	_ = child.On("sig", context.TODO() func(context.Context) error {
		fmt.Print("D")
		return nil
	})

	mgr.TriggerAndWait("sig", context.TODO())
	// Outputs: CDBA
}

It may not be immediately obvious why this even *should* be the desired behavior: It's both for efficiency of implementation (no global lists!) and so that, once created, a child SignalManager can have callbacks registered asynchronously while still being treated as a complete unit, to be handled as one bloc.

See also: SignalManager.WithErrorHandler.

func (*SignalManager) Stop

func (m *SignalManager) Stop()

Stop releases the resources associated with this SignalManager, preventing further signal registration or triggering - kind of.

In actuality, freeing the resources (and stopping further signal registration) will only take effect once all child SignalManagers have *also* been stopped. In general, this is the state we refer to as "stopped" in other documentation. Once a SignalManager is stopped, it is generally an error to attempt other operations with it.

func (*SignalManager) TriggerAndWait

func (m *SignalManager) TriggerAndWait(signal any, ctx context.Context) error

TriggerAndWait triggers the signal, calling all previously registered callbacks in reverse order, returning early if an unhandled error occurs.

When TriggerAndWait returns, any channel returned by SignalManager.Wait is guaranteed to be closed.

func (*SignalManager) TryWait

func (m *SignalManager) TryWait(signal any, ctx context.Context) error

TryWait waits for the signal to be triggered *and* all callbacks finished or canceled due to a prior error, unless the context is canceled before that happens.

When called with a context that is already canceled, TryWait will always return the context's error.

func (*SignalManager) Wait

func (m *SignalManager) Wait(signal any) <-chan struct{}

Wait returns a channel that is closed once the signal has been triggered *and* all callbacks finished or canceled due to an unresolved error. If the signal has already been triggered, Wait returns a channel that is already closed.

For convenience, SignalManager.TryWait waits with a context.

func (*SignalManager) WithErrorHandler

func (m *SignalManager) WithErrorHandler(handler func(context.Context, error) error) SignalRegister

WithErrorHandler returns a SignalRegister wrapped with the error handler. When executing any callbacks registered wth [SignalRegister.On] from the returned object, handler will be called with any non-nil error.

If handler returns nil, the error will be considered "resolved", and have no further impact. If handler returns a non-nil error, that error will be passed through any other previously defined error handlers, and eventually returned to the originating SignalManager.TriggerAndWait, if it is not resolved.

type SignalRegister

type SignalRegister interface {
	// See [SignalManager.On]
	On(signal any, immediateCtx context.Context, callbacks ...func(context.Context) error) error
	// See [SignalManager.WithErrorHandler]
	WithErrorHandler(handler func(context.Context, error) error) SignalRegister
}

SignalRegister represents types that can register a signal callback. It is produced exclusively by SignalManager.WithErrorHandler and further calls to WithErrorHandler on itself.

Refer to SignalManager.WithErrorHandler for more information.

type StackFrame

type StackFrame struct {
	// Function provides the name of the function being called, or the empty string if unknown.
	Function string
	// File gives the name of the file, or an empty string if the file is unknown.
	File string
	// Line gives the line number (starting from 1), or zero if the line number is unknown.
	Line int
}

Individual stack frame, contained in a StackTrace, produced by GetStackTrace.

type StackTrace

type StackTrace struct {
	// Frames provides the frames of this stack trace. Each frame's caller is at the index following
	// it; the first frame is the direct caller.
	Frames []StackFrame
	// Parent, if not nil, provides the "parent" stack trace - typically the stack trace at the
	// point this goroutine was spawned.
	Parent *StackTrace
}

StackTrace represents a collected stack trace, possibly with a parent (i.e caller)

StackTraces are designed to make it easy to track callers across goroutines. They are typically produced by GetStackTrace; refer to that function for more information.

func GetStackTrace

func GetStackTrace(parent *StackTrace, skip uint) StackTrace

GetStackTrace produces a StackTrace, optionally with a parent's stack trace to append.

skip sets the number of initial calling stack frames to exclude. Setting skip to zero will produce a StackTrace where the first StackFrame represents the location where GetStackTrace was called.

func (StackTrace) String

func (st StackTrace) String() string

String produces a string representation of the stack trace, roughly similar to the default panic handler's.

For some examples of formatting, refer to the StackTrace tests.

type TaskGroup

type TaskGroup struct {
	// contains filtered or unexported fields
}

TaskGroup provides sync.WaitGroup-like functionality, with the following changes:

  1. Tasks are named, added one at a time with TaskGroup.Add
  2. TaskGroups are hierarchical, with subgroups that can be separately Wait-ed on
  3. TaskGroup.Wait returns a channel, so it can be selected over
  4. The set of running tasks can be fetched with TaskGroup.Tasks, TaskGroup.Subgroups, or [Taskgroup.TaskTree].
  5. More tasks may be added after all have been completed

Other than those, the general idea should be roughly familiar to users of sync.WaitGroup.

See also: TaskTree, TaskInfo.

func NewTaskGroup

func NewTaskGroup(name string) *TaskGroup

NewTaskGroup creates a new TaskGroup with the given name

func (*TaskGroup) Add

func (g *TaskGroup) Add(name string)

Add adds a task with the name to the TaskGroup. Add may be called multiple times with the same name, in which case multiple instances of that task will be counted.

Waiting on the TaskGroup will not complete until there is exactly one call to TaskGroup.Done with a matching name for each call to Add.

func (*TaskGroup) Done

func (g *TaskGroup) Done(name string)

Done marks a task with the name as completed.

Done will panic if there aren't any remaining tasks with the name.

func (*TaskGroup) Finished

func (g *TaskGroup) Finished() bool

Finished returns whether all tasks are finished, i.e. if waiting will immediately complete.

func (*TaskGroup) Name

func (g *TaskGroup) Name() string

Name returns the name of the TaskGroup, as constructed via NewTaskGroup or TaskGroup.NewSubgroup.

func (*TaskGroup) NewSubgroup

func (g *TaskGroup) NewSubgroup(name string) *TaskGroup

NewSubgroup creates a new TaskGroup that is contained within g.

Waiting on the parent TaskGroup will not complete if the child TaskGroup has unfinished tasks.

func (*TaskGroup) Subgroups

func (g *TaskGroup) Subgroups() []*TaskGroup

Subgroups returns the set of TaskGroups with running tasks.

Note that between calling Subgroups and method on the returned TaskGroups, it may be possible that some or all of them have finished.

func (*TaskGroup) TaskTree

func (g *TaskGroup) TaskTree() TaskTree

TaskTree returns a snapshot of all running tasks.

If tasks are being added or removed during the call to TaskTree, the returned structure may not exactly match the state at any particular point in time - e.g. some returned subgroups may have no tasks.

In general, any property that is true at the start of calling TaskTree and remains true through to when the call is finished (like "task X in subgroup Y is running") will be correctly represented in the returend TaskTree. Changes that occur during the call may be missing.

The recommended use of this method is for runtime diagnostics - like having better information about exactly which tasks are still running, when waiting for something to stop.

func (*TaskGroup) Tasks

func (g *TaskGroup) Tasks() []TaskInfo

Tasks returns information about the set of running tasks. It does not recurse into subgroups.

Each returned TaskInfo is guaranteed to have a Count greater than zero, representing the number of tasks with that name. If all task names are unique, all task counts will be 1.

To get information about tasks and subgroups at the same time, try TaskGroup.TaskTree.

func (*TaskGroup) TryWait

func (g *TaskGroup) TryWait(ctx context.Context) error

TryWait Waits on the TaskGroup, returning early with ctx.Err() if the context is canceled.

If the context is already canceled when TryWait is called, this method will always return the context's error.

func (*TaskGroup) Wait

func (g *TaskGroup) Wait() <-chan struct{}

Wait returns a channel that is closed once all tasks have been completed with TaskGroup.Done.

type TaskInfo

type TaskInfo struct {
	Name string `json:"name"`
	// Count provides the number of running tasks named Name. Count is never zero when returned by
	// [TaskGroup.Tasks] or [TaskGroup.TaskTree].
	Count uint `json:"count"`
}

TaskInfo returns information about a set of tasks with a particular name.

Instances of TaskInfo are produced by TaskGroup.Tasks and TaskGroup.TaskTree.

type TaskTree

type TaskTree struct {
	Name      string     `json:"name"`
	Tasks     []TaskInfo `json:"tasks"`
	Subgroups []TaskTree `json:"subgroups"`
}

TaskTree represents the structure unfinished tasks in a TaskGroup, returned by TaskGroup.TaskTree.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL