Documentation ¶
Overview ¶
Package chord provides a collection of tools for task management, with a focus on minimizing magic.
Broadly, the tools belong to a few distinct groups:
- Signal handling: SignalManager and SignalRegister - Stack trace collection, printing, and appending: StackTrace, GetStackTrace, and StackFrame - Hierarchical, named sync.WaitGroup: TaskGroup, NewTaskGroup, TaskTree, TaskInfo
For an example of using all of it together, see: <https://github.com/neondatabase/autoscaling/blob/sharnoff/chord-task-mgmt/pkg/task/manager.go>.
Signal handling ¶
The general idea behind signal handling via SignalManager is that signals are mostly user-defined, trigger exactly once, and are hierarchical (i.e. triggering a signal in a child SignalManager does not affect the parent).
Users of SignalManager register callbacks via SignalManager.On. Callbacks are called at most once, when the signal is triggered. Callbacks are processed sequentially in the reverse order of when they're registered, and may return error. Unhandled errors will prevent calling any remaining callbacks.
Signals can be manually triggered with SignalManager.TriggerAndWait. There's special handling around os.Signal values so that forwarding into the SignalManager is automatically set up when required.
For more, see SignalManager.
Stack traces ¶
The primary goal of the stack trace tooling here is to make it easy to link stack traces across goroutines. To that end, GetStackTrace may be given a parent StackTrace to use, which gets appended on producing a string.
The stack trace management is designed with simplicity in mind, with optimizations for collection but not printing (i.e. GetStackTrace should be fast, but there's faster ways to print than via StackTrace.String).
For more, see StackTrace.
Hierarchical WaitGroup ¶
TaskGroup is essentially a glorified sync.WaitGroup. The main features it adds are: subgroups, named Add/Done, and fetching all tasks not yet Done. It also exposes TaskGroup.Wait as a channel, so you can select over it. TaskGroup.TryWait takes a context.Context and does that selection for you.
For more, see TaskGroup.
Example ¶
Start an HTTP "hello world" server that exits on SIGTERM or SIGINT, or after 2 seconds.
package main import ( "context" "fmt" "log" "net/http" "syscall" "time" "github.com/sharnoff/chord" ) // Shutdown will be used as the signal to trigger shutdown hooks var Shutdown shutdown type shutdown struct{} // returns a context that is canceled after 1 second func makeShutdownContext() context.Context { // it's ok to leak the context here; this is just for shutdown. ctx, _ := context.WithTimeout(context.TODO(), time.Second) return ctx } // Start an HTTP "hello world" server that exits on SIGTERM or SIGINT, or after 2 seconds. func main() { mgr := chord.NewSignalManager() defer mgr.Stop() // Once done, wait for shutdown to complete defer func() { if err := mgr.TryWait(Shutdown, makeShutdownContext()); err != nil { log.Fatalf("timed out waiting for shutdown: %s", err) } }() // Forward OS signals to our custom Shutdown signal _ = mgr.On(syscall.SIGTERM, context.TODO(), func(ctx context.Context) error { return mgr.TriggerAndWait(Shutdown, ctx) }) _ = mgr.On(syscall.SIGINT, context.TODO(), func(ctx context.Context) error { return mgr.TriggerAndWait(Shutdown, ctx) }) // Log that shutdown completed successfully, once done _ = mgr.On(Shutdown, context.TODO(), func(context.Context) error { fmt.Println("Shutdown complete!") return nil }) server := &http.Server{ Addr: ":8080", Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte("hello, world!")) }), } // Register the shutdown callback withHandler := mgr.WithErrorHandler(func(c context.Context, err error) error { if err != nil { log.Fatal(err) } return nil }) // error will be handled; we don't need to worry about it _ = withHandler.On(Shutdown, context.TODO(), func(ctx context.Context) error { err := server.Shutdown(ctx) return err }) // timeout after 2 seconds go func() { time.Sleep(2 * time.Second) // safe to ignore error because of the error handling above. Otherwise, that error could // appear here. _ = mgr.TriggerAndWait(Shutdown, makeShutdownContext()) }() fmt.Println("Starting server") if err := server.ListenAndServe(); err != http.ErrServerClosed { log.Fatal(err) } }
Output: Starting server Shutdown complete!
Index ¶
- Variables
- type SignalManager
- func (m *SignalManager) Context(signal any) context.Context
- func (m *SignalManager) Ignore(signal any)
- func (m *SignalManager) NewChild() *SignalManager
- func (m *SignalManager) On(signal any, immediateCtx context.Context, ...) error
- func (m *SignalManager) Stop()
- func (m *SignalManager) TriggerAndWait(signal any, ctx context.Context) error
- func (m *SignalManager) TryWait(signal any, ctx context.Context) error
- func (m *SignalManager) Wait(signal any) <-chan struct{}
- func (m *SignalManager) WithErrorHandler(handler func(context.Context, error) error) SignalRegister
- type SignalRegister
- type StackFrame
- type StackTrace
- type TaskGroup
- func (g *TaskGroup) Add(name string)
- func (g *TaskGroup) Done(name string)
- func (g *TaskGroup) Finished() bool
- func (g *TaskGroup) Name() string
- func (g *TaskGroup) NewSubgroup(name string) *TaskGroup
- func (g *TaskGroup) Subgroups() []*TaskGroup
- func (g *TaskGroup) TaskTree() TaskTree
- func (g *TaskGroup) Tasks() []TaskInfo
- func (g *TaskGroup) TryWait(ctx context.Context) error
- func (g *TaskGroup) Wait() <-chan struct{}
- type TaskInfo
- type TaskTree
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var SignalManagerAlreadyStopped = errors.New("SignalManager already stopped")
SignalManagerAlreadyStopped is the payload for a variety of panics that may occur when calling a SignalManager's methods after it's already stopped.
See SignalManager.Stop for more info.
Functions ¶
This section is empty.
Types ¶
type SignalManager ¶
type SignalManager struct {
// contains filtered or unexported fields
}
SignalManager provides generic handling for program lifetime signals, where signals are generally expected to occur exactly once, and may be user-defined or builtin (like os.Signal).
The basic interface is that callbacks are registered with SignalManager.On, eventually called when SignalManager.TriggerAndWait is called with that signal.
SignalManager is hierarchical: SignalManager.NewChild returns a new SignalManager that inherits all signals from its parent, but can be independently triggered without affecting the parent.
All SignalManager methods are safe for multi-threaded use.
Refer to the examples for some sample use cases.
func NewSignalManager ¶
func NewSignalManager() *SignalManager
NewSignalManager returns a new SignalManager
Callers MUST make sure to call SignalManager.Stop when done, or else the SignalManager may leak goroutines.
func (*SignalManager) Context ¶
func (m *SignalManager) Context(signal any) context.Context
Context returns a Context that is canceled once the signal is triggered.
Note that the semantics of this method are different from SignalManager.Wait; the channel returned by Wait is closed after all callbacks have finished, whereas the channel returned by Context().Done() is closed after the signal is initially triggered.
func (*SignalManager) Ignore ¶
func (m *SignalManager) Ignore(signal any)
Ignore prevents a signal triggering in the SignalManager's parent from affecting it directly. If further calls SignalManager.TriggerAndWait are made in its parent (or parent's parent, etc), then it will not flow down through to this one.
If the signal has been previously triggered in an ancestor SignalManager and *observed* in this SignalManager or any descendent, Ignore will have no effect.
A triggered signal is considered "observed" if there has been any call to SignalManager.Context, SignalManager.Wait, or SignalManager.On (with ≥1 callback).
func (*SignalManager) NewChild ¶
func (m *SignalManager) NewChild() *SignalManager
NewChild creates a new SignalManager as a child of m. By default, it will inherit all signals that are not later ignored (via SignalManager.Ignore). Calling SignalManager.TriggerAndWait on the child will not affect the parent.
Callers MUST make sure to call SignalManager.Stop when done with the child, or else the SignalManager may leak goroutines.
NewChild will panic with SignalManagerAlreadyStopped if called on a SignalManager that has already stopped (see SignalManager.Stop for more info).
func (*SignalManager) On ¶
func (m *SignalManager) On(signal any, immediateCtx context.Context, callbacks ...func(context.Context) error) error
On registers the callbacks to be called when the signal is triggered.
If the signal has already been triggered (either via a call to SignalManager.TriggerAndWait on this SignalManager, or one of its ancestors), then the callbacks will be immediately called, passing immediateCtx and returning any unhandled error.
Ordinarily, any unhandled error from a callback will abort executing all other callbacks. A function may be provided to handle (and possibly silence) these errors, via SignalManager.WithErrorHandler, which returns an object with the same definition of On.
On will panic with SignalManagerAlreadyStopped called on a SignalManager that has already stopped (see SignalManager.Stop for more info).
All registered callbacks for a signal are called in reverse order - both within and across calls to On.
A child SignalManager's callbacks are all called at the same time, as if they were all added at the same time, even if adding them was actually interleaved. For example:
func orderingExample() { mgr := chord.NewSignalManager() defer mgr.Close() _ = mgr.On("sig", context.TODO(), func(context.Context) error { fmt.Print("A") return nil }) child :- mgr.NewChild() defer child.Close() _ = child.On("sig", context.TODO() func(context.Context) error { fmt.Print("B") return nil }) _ = mgr.On("sig", context.TODO(), func(context.Context) error { fmt.Print("C") return nil }) // Even though this call to On is "after" the previous, the callback is handled later // because the original child was created before. _ = child.On("sig", context.TODO() func(context.Context) error { fmt.Print("D") return nil }) mgr.TriggerAndWait("sig", context.TODO()) // Outputs: CDBA }
It may not be immediately obvious why this even *should* be the desired behavior: It's both for efficiency of implementation (no global lists!) and so that, once created, a child SignalManager can have callbacks registered asynchronously while still being treated as a complete unit, to be handled as one bloc.
See also: SignalManager.WithErrorHandler.
func (*SignalManager) Stop ¶
func (m *SignalManager) Stop()
Stop releases the resources associated with this SignalManager, preventing further signal registration or triggering - kind of.
In actuality, freeing the resources (and stopping further signal registration) will only take effect once all child SignalManagers have *also* been stopped. In general, this is the state we refer to as "stopped" in other documentation. Once a SignalManager is stopped, it is generally an error to attempt other operations with it.
func (*SignalManager) TriggerAndWait ¶
func (m *SignalManager) TriggerAndWait(signal any, ctx context.Context) error
TriggerAndWait triggers the signal, calling all previously registered callbacks in reverse order, returning early if an unhandled error occurs.
When TriggerAndWait returns, any channel returned by SignalManager.Wait is guaranteed to be closed.
func (*SignalManager) TryWait ¶
func (m *SignalManager) TryWait(signal any, ctx context.Context) error
TryWait waits for the signal to be triggered *and* all callbacks finished or canceled due to a prior error, unless the context is canceled before that happens.
When called with a context that is already canceled, TryWait will always return the context's error.
func (*SignalManager) Wait ¶
func (m *SignalManager) Wait(signal any) <-chan struct{}
Wait returns a channel that is closed once the signal has been triggered *and* all callbacks finished or canceled due to an unresolved error. If the signal has already been triggered, Wait returns a channel that is already closed.
For convenience, SignalManager.TryWait waits with a context.
func (*SignalManager) WithErrorHandler ¶
func (m *SignalManager) WithErrorHandler(handler func(context.Context, error) error) SignalRegister
WithErrorHandler returns a SignalRegister wrapped with the error handler. When executing any callbacks registered wth [SignalRegister.On] from the returned object, handler will be called with any non-nil error.
If handler returns nil, the error will be considered "resolved", and have no further impact. If handler returns a non-nil error, that error will be passed through any other previously defined error handlers, and eventually returned to the originating SignalManager.TriggerAndWait, if it is not resolved.
type SignalRegister ¶
type SignalRegister interface { // See [SignalManager.On] On(signal any, immediateCtx context.Context, callbacks ...func(context.Context) error) error // See [SignalManager.WithErrorHandler] WithErrorHandler(handler func(context.Context, error) error) SignalRegister }
SignalRegister represents types that can register a signal callback. It is produced exclusively by SignalManager.WithErrorHandler and further calls to WithErrorHandler on itself.
Refer to SignalManager.WithErrorHandler for more information.
type StackFrame ¶
type StackFrame struct { // Function provides the name of the function being called, or the empty string if unknown. Function string // File gives the name of the file, or an empty string if the file is unknown. File string // Line gives the line number (starting from 1), or zero if the line number is unknown. Line int }
Individual stack frame, contained in a StackTrace, produced by GetStackTrace.
type StackTrace ¶
type StackTrace struct { // Frames provides the frames of this stack trace. Each frame's caller is at the index following // it; the first frame is the direct caller. Frames []StackFrame // Parent, if not nil, provides the "parent" stack trace - typically the stack trace at the // point this goroutine was spawned. Parent *StackTrace }
StackTrace represents a collected stack trace, possibly with a parent (i.e caller)
StackTraces are designed to make it easy to track callers across goroutines. They are typically produced by GetStackTrace; refer to that function for more information.
func GetStackTrace ¶
func GetStackTrace(parent *StackTrace, skip uint) StackTrace
GetStackTrace produces a StackTrace, optionally with a parent's stack trace to append.
skip sets the number of initial calling stack frames to exclude. Setting skip to zero will produce a StackTrace where the first StackFrame represents the location where GetStackTrace was called.
func (StackTrace) String ¶
func (st StackTrace) String() string
String produces a string representation of the stack trace, roughly similar to the default panic handler's.
For some examples of formatting, refer to the StackTrace tests.
type TaskGroup ¶
type TaskGroup struct {
// contains filtered or unexported fields
}
TaskGroup provides sync.WaitGroup-like functionality, with the following changes:
- Tasks are named, added one at a time with TaskGroup.Add
- TaskGroups are hierarchical, with subgroups that can be separately Wait-ed on
- TaskGroup.Wait returns a channel, so it can be selected over
- The set of running tasks can be fetched with TaskGroup.Tasks, TaskGroup.Subgroups, or [Taskgroup.TaskTree].
- More tasks may be added after all have been completed
Other than those, the general idea should be roughly familiar to users of sync.WaitGroup.
func NewTaskGroup ¶
NewTaskGroup creates a new TaskGroup with the given name
func (*TaskGroup) Add ¶
Add adds a task with the name to the TaskGroup. Add may be called multiple times with the same name, in which case multiple instances of that task will be counted.
Waiting on the TaskGroup will not complete until there is exactly one call to TaskGroup.Done with a matching name for each call to Add.
func (*TaskGroup) Done ¶
Done marks a task with the name as completed.
Done will panic if there aren't any remaining tasks with the name.
func (*TaskGroup) Finished ¶
Finished returns whether all tasks are finished, i.e. if waiting will immediately complete.
func (*TaskGroup) Name ¶
Name returns the name of the TaskGroup, as constructed via NewTaskGroup or TaskGroup.NewSubgroup.
func (*TaskGroup) NewSubgroup ¶
NewSubgroup creates a new TaskGroup that is contained within g.
Waiting on the parent TaskGroup will not complete if the child TaskGroup has unfinished tasks.
func (*TaskGroup) Subgroups ¶
Subgroups returns the set of TaskGroups with running tasks.
Note that between calling Subgroups and method on the returned TaskGroups, it may be possible that some or all of them have finished.
func (*TaskGroup) TaskTree ¶
TaskTree returns a snapshot of all running tasks.
If tasks are being added or removed during the call to TaskTree, the returned structure may not exactly match the state at any particular point in time - e.g. some returned subgroups may have no tasks.
In general, any property that is true at the start of calling TaskTree and remains true through to when the call is finished (like "task X in subgroup Y is running") will be correctly represented in the returend TaskTree. Changes that occur during the call may be missing.
The recommended use of this method is for runtime diagnostics - like having better information about exactly which tasks are still running, when waiting for something to stop.
func (*TaskGroup) Tasks ¶
Tasks returns information about the set of running tasks. It does not recurse into subgroups.
Each returned TaskInfo is guaranteed to have a Count greater than zero, representing the number of tasks with that name. If all task names are unique, all task counts will be 1.
To get information about tasks and subgroups at the same time, try TaskGroup.TaskTree.
func (*TaskGroup) TryWait ¶
TryWait Waits on the TaskGroup, returning early with ctx.Err() if the context is canceled.
If the context is already canceled when TryWait is called, this method will always return the context's error.
func (*TaskGroup) Wait ¶
func (g *TaskGroup) Wait() <-chan struct{}
Wait returns a channel that is closed once all tasks have been completed with TaskGroup.Done.
type TaskInfo ¶
type TaskInfo struct { Name string `json:"name"` // Count provides the number of running tasks named Name. Count is never zero when returned by // [TaskGroup.Tasks] or [TaskGroup.TaskTree]. Count uint `json:"count"` }
TaskInfo returns information about a set of tasks with a particular name.
Instances of TaskInfo are produced by TaskGroup.Tasks and TaskGroup.TaskTree.