Documentation ¶
Index ¶
- func CollectRootErrors(err error) []error
- func CollectUniqueErrors(inErrs []error) []error
- func Wait(ctx context.Context, f func())
- func WaitChan[T any](ctx context.Context, ch <-chan T) error
- func WaitChanE[T error](ctx context.Context, ch <-chan T) error
- func WaitDep(ctx context.Context, dep Dep) error
- func WaitE(ctx context.Context, f func() error) error
- type Action
- func (a *Action) AddDep(deps ...Dep)
- func (a *Action) AddHook(hook Hook)
- func (a *Action) DeepDo(f func(Dep))
- func (a *Action) ErrorCh() <-chan error
- func (a *Action) Exec(ctx context.Context, ins InStore, outs OutStore) error
- func (a *Action) GetCtx() context.Context
- func (a *Action) GetErr() error
- func (a *Action) GetExecutionDebugString() string
- func (a *Action) GetHooks() []Hook
- func (a *Action) GetName() string
- func (a *Action) GetNode() *dag.Node[Dep]
- func (a *Action) GetQueuedAt() time.Time
- func (a *Action) GetRequest() map[string]float64
- func (a *Action) GetScheduledAt() time.Time
- func (a *Action) GetScheduler() Scheduler
- func (a *Action) GetStartedAt() time.Time
- func (a *Action) GetState() ExecState
- func (a *Action) OutputCh() <-chan Value
- func (a *Action) SetCtx(ctx context.Context)
- func (a *Action) Wait() <-chan struct{}
- type ActionConfig
- type Dep
- type Engine
- type Error
- type Event
- type EventCompleted
- type EventDeclared
- type EventNewDep
- type EventQueued
- type EventReady
- type EventScheduled
- type EventSkipped
- type EventStarted
- type EventSuspended
- type EventWithExecution
- type ExecState
- type Execution
- func (e *Execution) GetOutput() Value
- func (e *Execution) GetStatus() status.Statuser
- func (e *Execution) Interactive() bool
- func (e *Execution) ResumeAck()
- func (e *Execution) Run()
- func (e *Execution) Status(status status.Statuser)
- func (e *Execution) String() string
- func (e *Execution) Suspend() *SuspendBag
- func (e *Execution) Wait() <-chan struct{}
- func (e *Execution) WaitSuspend() <-chan *SuspendBag
- type Group
- func (a *Group) AddDep(deps ...Dep)
- func (a *Group) AddHook(hook Hook)
- func (g *Group) DeepDo(f func(Dep))
- func (a *Group) ErrorCh() <-chan error
- func (g *Group) Exec(ctx context.Context, ins InStore, outs OutStore) error
- func (g *Group) GetCtx() context.Context
- func (a *Group) GetErr() error
- func (a *Group) GetExecutionDebugString() string
- func (a *Group) GetHooks() []Hook
- func (g *Group) GetName() string
- func (a *Group) GetNode() *dag.Node[Dep]
- func (a *Group) GetQueuedAt() time.Time
- func (g *Group) GetRequest() map[string]float64
- func (a *Group) GetScheduledAt() time.Time
- func (g *Group) GetScheduler() Scheduler
- func (a *Group) GetStartedAt() time.Time
- func (a *Group) GetState() ExecState
- func (a *Group) OutputCh() <-chan Value
- func (g *Group) SetCtx(ctx context.Context)
- func (a *Group) Wait() <-chan struct{}
- type GroupConfig
- type Hook
- type InStore
- type LimitScheduler
- type MapValue
- type MemValue
- type Named
- type OutStore
- type ResourceScheduler
- type RunningTracker
- type Scheduler
- type Sem
- type StageHook
- type Stats
- type StatsCollector
- type SuspendBag
- type UnlimitedScheduler
- type Value
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CollectRootErrors ¶
func CollectUniqueErrors ¶
Types ¶
type Action ¶
type Action struct {
// contains filtered or unexported fields
}
func NewAction ¶
func NewAction(cfg ActionConfig) *Action
func (*Action) GetExecutionDebugString ¶
func (a *Action) GetExecutionDebugString() string
func (*Action) GetQueuedAt ¶
func (*Action) GetRequest ¶
func (*Action) GetScheduledAt ¶
func (*Action) GetScheduler ¶
func (*Action) GetStartedAt ¶
type ActionConfig ¶
type Dep ¶
type Dep interface { GetName() string Exec(ctx context.Context, ins InStore, outs OutStore) error GetNode() *dag.Node[Dep] AddDep(...Dep) GetHooks() []Hook AddHook(h Hook) Wait() <-chan struct{} DeepDo(f func(Dep)) GetCtx() context.Context SetCtx(ctx context.Context) GetErr() error GetState() ExecState GetScheduledAt() time.Time GetStartedAt() time.Time GetQueuedAt() time.Time GetScheduler() Scheduler GetRequest() map[string]float64 GetExecutionDebugString() string // contains filtered or unexported methods }
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
func (*Engine) GetLiveExecutions ¶
func (*Engine) RegisterHook ¶
func (*Engine) SetDefaultScheduler ¶
type Error ¶
type EventCompleted ¶
func (EventCompleted) Replayable ¶
func (e EventCompleted) Replayable() bool
type EventDeclared ¶
type EventDeclared struct {
Dep Dep
}
func (EventDeclared) Replayable ¶
func (EventDeclared) Replayable() bool
type EventNewDep ¶
func (EventNewDep) Replayable ¶
func (e EventNewDep) Replayable() bool
type EventQueued ¶
func (EventQueued) Replayable ¶
func (e EventQueued) Replayable() bool
type EventReady ¶
func (EventReady) Replayable ¶
func (e EventReady) Replayable() bool
type EventScheduled ¶
func (EventScheduled) Replayable ¶
func (e EventScheduled) Replayable() bool
type EventSkipped ¶
func (EventSkipped) Replayable ¶
func (e EventSkipped) Replayable() bool
type EventStarted ¶
func (EventStarted) Replayable ¶
func (e EventStarted) Replayable() bool
type EventSuspended ¶
type EventSuspended struct { At time.Time Execution *Execution Bag *SuspendBag }
func (EventSuspended) Replayable ¶
func (e EventSuspended) Replayable() bool
type EventWithExecution ¶
type EventWithExecution interface { Event // contains filtered or unexported methods }
type Execution ¶
type Execution struct { ID uint64 Dep Dep State ExecState Err error ScheduledAt time.Time QueuedAt time.Time StartedAt time.Time CompletedAt time.Time // contains filtered or unexported fields }
func (*Execution) Interactive ¶
func (*Execution) Suspend ¶
func (e *Execution) Suspend() *SuspendBag
func (*Execution) WaitSuspend ¶
func (e *Execution) WaitSuspend() <-chan *SuspendBag
type Group ¶
type Group struct {
// contains filtered or unexported fields
}
func NewGroupWith ¶
func NewGroupWith(cfg GroupConfig) *Group
func NewNamedGroup ¶
func (*Group) GetExecutionDebugString ¶
func (a *Group) GetExecutionDebugString() string
func (*Group) GetQueuedAt ¶
func (*Group) GetRequest ¶
func (*Group) GetScheduledAt ¶
func (*Group) GetScheduler ¶
func (*Group) GetStartedAt ¶
type GroupConfig ¶
type LimitScheduler ¶
type LimitScheduler struct {
// contains filtered or unexported fields
}
func NewLimitScheduler ¶
func NewLimitScheduler(limit int) *LimitScheduler
func (*LimitScheduler) Done ¶
func (ls *LimitScheduler) Done(d Dep)
type ResourceScheduler ¶
type ResourceScheduler struct {
// contains filtered or unexported fields
}
func NewResourceScheduler ¶
func NewResourceScheduler(limits map[string]float64, def map[string]float64) *ResourceScheduler
func (*ResourceScheduler) Done ¶
func (ls *ResourceScheduler) Done(d Dep)
type RunningTracker ¶
type RunningTracker struct {
// contains filtered or unexported fields
}
func NewRunningTracker ¶
func NewRunningTracker() *RunningTracker
func (*RunningTracker) Get ¶
func (t *RunningTracker) Get() []*Execution
func (*RunningTracker) Group ¶
func (t *RunningTracker) Group() *Group
func (*RunningTracker) Hook ¶
func (t *RunningTracker) Hook() Hook
type StageHook ¶
type Stats ¶
type Stats struct { All uint64 Completed uint64 Scheduled uint64 Waiting uint64 Succeeded uint64 Failed uint64 Skipped uint64 Suspended uint64 Running uint64 }
func CollectStats ¶
CollectStats can get quite expensive on large DAGs, prefer NewStatsCollector
type StatsCollector ¶
type StatsCollector struct {
// contains filtered or unexported fields
}
func NewStatsCollector ¶
func NewStatsCollector() *StatsCollector
func (*StatsCollector) Collect ¶
func (c *StatsCollector) Collect() Stats
func (*StatsCollector) Register ¶
func (c *StatsCollector) Register(dep Dep)
type SuspendBag ¶
type SuspendBag struct {
// contains filtered or unexported fields
}
func (*SuspendBag) Resume ¶
func (e *SuspendBag) Resume() <-chan struct{}
func (*SuspendBag) WaitResume ¶
func (e *SuspendBag) WaitResume() <-chan struct{}
type UnlimitedScheduler ¶
type UnlimitedScheduler struct{}
func (UnlimitedScheduler) Done ¶
func (ls UnlimitedScheduler) Done(d Dep)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.