Versions in this module Expand all Collapse all v0 v0.0.2 Mar 12, 2021 Changes in this version + const DotStr + const MaybeBoolFalse + const MaybeBoolNull + const MaybeBoolTrue + const ResultTaskDotID + var ErrBadInput = errors.New("bad input for task") + var ErrWrongInputCardinality = errors.New("wrong number of task inputs") + func FindBridge(db *gorm.DB, name models.TaskType) (models.BridgeType, error) + func NewORM(db *gorm.DB, config Config, eventBroadcaster postgres.EventBroadcaster) *orm + func NewRunner(orm ORM, config Config) *runner + func WrapResultIfError(result *Result, msg string, args ...interface{}) + type AnyTask struct + func (t *AnyTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) (result Result) + func (t *AnyTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error + func (t *AnyTask) Type() TaskType + type BaseTask struct + Index int32 + Timeout time.Duration + func NewBaseTask(dotID string, t Task, index int32) BaseTask + func (t *BaseTask) SetOutputTask(outputTask Task) + func (t BaseTask) DotID() string + func (t BaseTask) OutputIndex() int32 + func (t BaseTask) OutputTask() Task + func (t BaseTask) TaskTimeout() (time.Duration, bool) + type BridgeTask struct + Name string + RequestData HttpRequestData + func (t *BridgeTask) HelperSetConfigAndTxDB(config Config, txdb *gorm.DB) + func (t *BridgeTask) Run(ctx context.Context, taskRun TaskRun, inputs []Result) (result Result) + func (t *BridgeTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error + func (t *BridgeTask) Type() TaskType + func (t BridgeTask) ExportedEquals(otherTask Task) bool + type Config interface + BridgeResponseURL func() *url.URL + DatabaseMaximumTxDuration func() time.Duration + DatabaseURL func() string + DefaultHTTPAllowUnrestrictedNetworkAccess func() bool + DefaultHTTPLimit func() int64 + DefaultHTTPTimeout func() models.Duration + DefaultMaxHTTPAttempts func() uint + JobPipelineMaxRunDuration func() time.Duration + JobPipelineMaxTaskDuration func() time.Duration + JobPipelineParallelism func() uint8 + JobPipelineReaperInterval func() time.Duration + JobPipelineReaperThreshold func() time.Duration + TriggerFallbackDBPollInterval func() time.Duration + type FinalErrors []null.String + func (fe *FinalErrors) Scan(value interface{}) error + func (fe FinalErrors) Error() string + func (fe FinalErrors) HasErrors() bool + func (fe FinalErrors) Value() (driver.Value, error) + type FinalResult struct + Errors []error + Values []interface{} + func (result FinalResult) ErrorsDB() JSONSerializable + func (result FinalResult) HasErrors() bool + func (result FinalResult) OutputsDB() JSONSerializable + func (result FinalResult) SingularResult() (Result, error) + type HTTPTask struct + AllowUnrestrictedNetworkAccess MaybeBool + Method string + RequestData HttpRequestData + URL models.WebURL + func (t *HTTPTask) HelperSetConfig(config Config) + func (t *HTTPTask) Run(ctx context.Context, taskRun TaskRun, inputs []Result) Result + func (t *HTTPTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error + func (t *HTTPTask) Type() TaskType + func (t HTTPTask) ExportedEquals(otherTask Task) bool + type HttpRequestData map[string]interface + func (h *HttpRequestData) Scan(value interface{}) error + func (h HttpRequestData) AsMap() map[string]interface{} + func (h HttpRequestData) Value() (driver.Value, error) + type JSONParseTask struct + Lax bool + Path JSONPath + func (t *JSONParseTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) (result Result) + func (t *JSONParseTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error + func (t *JSONParseTask) Type() TaskType + func (t JSONParseTask) ExportedEquals(otherTask Task) bool + type JSONPath []string + func (p *JSONPath) Scan(value interface{}) error + func (p *JSONPath) UnmarshalText(bs []byte) error + func (p JSONPath) Value() (driver.Value, error) + type JSONSerializable struct + Null bool + Val interface{} + func (js *JSONSerializable) Scan(value interface{}) error + func (js *JSONSerializable) UnmarshalJSON(bs []byte) error + func (js JSONSerializable) MarshalJSON() ([]byte, error) + func (js JSONSerializable) Value() (driver.Value, error) + type MaybeBool string + func MaybeBoolFromString(s string) (MaybeBool, error) + func (m MaybeBool) Bool() (b bool, isSet bool) + type MedianTask struct + AllowedFaults uint64 + func (t *MedianTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) (result Result) + func (t *MedianTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error + func (t *MedianTask) Type() TaskType + func (t MedianTask) ExportedEquals(otherTask Task) bool + type MultiplyTask struct + Times decimal.Decimal + func (t *MultiplyTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) (result Result) + func (t *MultiplyTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error + func (t *MultiplyTask) Type() TaskType + func (t MultiplyTask) ExportedEquals(otherTask Task) bool + type ORM interface + AwaitRun func(ctx context.Context, runID int64) error + CreateRun func(ctx context.Context, jobID int32, meta map[string]interface{}) (int64, error) + CreateSpec func(ctx context.Context, db *gorm.DB, taskDAG TaskDAG, ...) (int32, error) + DB func() *gorm.DB + DeleteRunsOlderThan func(threshold time.Duration) error + FindBridge func(name models.TaskType) (models.BridgeType, error) + InsertFinishedRunWithResults func(ctx context.Context, run Run, trrs []TaskRunResult) (runID int64, err error) + ListenForNewRuns func() (postgres.Subscription, error) + ProcessNextUnfinishedRun func(ctx context.Context, fn ProcessRunFunc) (bool, error) + ResultsForRun func(ctx context.Context, runID int64) ([]Result, error) + RunFinished func(runID int64) (bool, error) + type PossibleErrorResponses struct + Error string + ErrorMessage string + type ProcessRunFunc func(ctx context.Context, txdb *gorm.DB, pRun Run, l logger.Logger) (TaskRunResults, error) + type Result struct + Error error + Value interface{} + func (result Result) ErrorDB() null.String + func (result Result) OutputDB() JSONSerializable + type ResultTask struct + func (t *ResultTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) Result + func (t *ResultTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error + func (t *ResultTask) Type() TaskType + func (t ResultTask) ExportedEquals(otherTask Task) bool + type Run struct + CreatedAt time.Time + Errors JSONSerializable + FinishedAt *time.Time + ID int64 + Meta JSONSerializable + Outputs JSONSerializable + PipelineSpec Spec + PipelineSpecID int32 + PipelineTaskRuns []TaskRun + func (Run) TableName() string + func (r *Run) SetID(value string) error + func (r Run) FinalErrors() (f FinalErrors) + func (r Run) GetID() string + func (r Run) HasErrors() bool + type Runner interface + AwaitRun func(ctx context.Context, runID int64) error + Close func() error + CreateRun func(ctx context.Context, jobID int32, meta map[string]interface{}) (runID int64, err error) + ExecuteAndInsertNewRun func(ctx context.Context, spec Spec, l logger.Logger) (finalResult FinalResult, err error) + ExecuteRun func(ctx context.Context, run Run, l logger.Logger) (trrs TaskRunResults, err error) + ResultsForRun func(ctx context.Context, runID int64) ([]Result, error) + Start func() error + type Spec struct + CreatedAt time.Time + DotDagSource string + ID int32 + MaxTaskDuration models.Interval + PipelineTaskSpecs []TaskSpec + func (Spec) TableName() string + type Task interface + DotID func() string + OutputIndex func() int32 + OutputTask func() Task + Run func(ctx context.Context, taskRun TaskRun, inputs []Result) Result + SetDefaults func(inputValues map[string]string, g TaskDAG, self taskDAGNode) error + SetOutputTask func(task Task) + TaskTimeout func() (time.Duration, bool) + Type func() TaskType + func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, dotID string, config Config, ...) (_ Task, err error) + type TaskDAG struct + DOTSource string + func NewTaskDAG() *TaskDAG + func (g *TaskDAG) HasCycles() bool + func (g *TaskDAG) NewNode() graph.Node + func (g *TaskDAG) UnmarshalText(bs []byte) (err error) + func (g TaskDAG) MinTimeout() (time.Duration, bool, error) + func (g TaskDAG) TasksInDependencyOrder() ([]Task, error) + type TaskRun struct + CreatedAt time.Time + Error null.String + FinishedAt *time.Time + ID int64 + Output *JSONSerializable + PipelineRun Run + PipelineRunID int64 + PipelineTaskSpec TaskSpec + PipelineTaskSpecID int32 + Type TaskType + func (TaskRun) TableName() string + func (tr *TaskRun) SetID(value string) error + func (tr TaskRun) DotID() string + func (tr TaskRun) GetID() string + func (tr TaskRun) Result() Result + type TaskRunResult struct + FinishedAt time.Time + ID int64 + IsTerminal bool + Result Result + TaskSpecID int32 + type TaskRunResults []TaskRunResult + func (trrs TaskRunResults) FinalResult() (result FinalResult) + type TaskSpec struct + CreatedAt time.Time + DotID string + ID int32 + Index int32 + JSON JSONSerializable + PipelineSpec Spec + PipelineSpecID int32 + SuccessorID null.Int + Type TaskType + func (TaskSpec) TableName() string + func (s TaskSpec) IsFinalPipelineOutput() bool + type TaskType string + const TaskTypeAny + const TaskTypeBridge + const TaskTypeHTTP + const TaskTypeJSONParse + const TaskTypeMedian + const TaskTypeMultiply + const TaskTypeResult