Documentation ¶
Overview ¶
Package job is for the scheduling and execution of asynchronous jobs via the workers. The scheduling is done via the triggers. The jobs are put in queues before being processed by a worker.
Index ¶
- Constants
- Variables
- func AddWorker(conf *WorkerConfig)
- func GetCounterTypeFromWorkerType(workerType string) (limits.CounterType, error)
- func GetWorkersNamesList() []string
- func SetRedisTimeoutForTest()
- func SystemStart(b Broker, s Scheduler, workersList WorkersList) error
- type AtTrigger
- type BadTriggerError
- type Broker
- type BrokerMock
- func (m *BrokerMock) PushJob(db prefixer.Prefixer, request *JobRequest) (*Job, error)
- func (m *BrokerMock) ShutdownWorkers(ctx context.Context) error
- func (m *BrokerMock) StartWorkers(workersList WorkersList) error
- func (m *BrokerMock) WorkerIsReserved(workerType string) (bool, error)
- func (m *BrokerMock) WorkerQueueLen(workerType string) (int, error)
- func (m *BrokerMock) WorkersTypes() []string
- type ClientTrigger
- type CronTrigger
- func NewCronTrigger(infos *TriggerInfos) (*CronTrigger, error)
- func NewDailyTrigger(infos *TriggerInfos) (*CronTrigger, error)
- func NewEveryTrigger(infos *TriggerInfos) (*CronTrigger, error)
- func NewHourlyTrigger(infos *TriggerInfos) (*CronTrigger, error)
- func NewMonthlyTrigger(infos *TriggerInfos) (*CronTrigger, error)
- func NewWeeklyTrigger(infos *TriggerInfos) (*CronTrigger, error)
- type DumpFilePather
- type Event
- type EventTrigger
- type FrequencyKind
- type Job
- func FilterByWorkerAndState(jobs []*Job, workerType string, state State, limit int) []*Job
- func FilterJobsBeforeDate(jobs []*Job, date time.Time) []*Job
- func Get(db prefixer.Prefixer, jobID string) (*Job, error)
- func GetAllJobs(db prefixer.Prefixer) ([]*Job, error)
- func GetJobs(db prefixer.Prefixer, triggerID string, limit int) ([]*Job, error)
- func GetLastsJobs(jobs []*Job, workerType string) ([]*Job, error)
- func GetQueuedJobs(db prefixer.Prefixer, workerType string) ([]*Job, error)
- func NewJob(db prefixer.Prefixer, req *JobRequest) *Job
- func (j *Job) Ack() error
- func (j *Job) AckConsumed() error
- func (j *Job) Clone() couchdb.Doc
- func (j *Job) Create() error
- func (j *Job) DBCluster() int
- func (j *Job) DBPrefix() string
- func (j *Job) DocType() string
- func (j *Job) DomainName() string
- func (j *Job) Fetch(field string) []string
- func (j *Job) ID() string
- func (j *Job) Logger() *logger.Entry
- func (j *Job) Nack(errorMessage string) error
- func (j *Job) Rev() string
- func (j *Job) SetID(id string)
- func (j *Job) SetRev(rev string)
- func (j *Job) Update() error
- func (j *Job) WaitUntilDone(db prefixer.Prefixer) error
- type JobErrorCheckerHook
- type JobOptions
- type JobRequest
- type JobSystem
- type Message
- type Payload
- type PeriodicParser
- type PeriodicSpec
- type Scheduler
- type ShareGroupMessage
- type ShareGroupTrigger
- type State
- type TaskContext
- func (c *TaskContext) Cookie() interface{}
- func (c *TaskContext) ID() string
- func (c *TaskContext) Logger() logger.Logger
- func (c *TaskContext) Manual() bool
- func (c *TaskContext) NoRetry() bool
- func (c *TaskContext) SetNoRetry()
- func (c *TaskContext) TriggerID() (string, bool)
- func (c *TaskContext) UnmarshalEvent(v interface{}) error
- func (c *TaskContext) UnmarshalMessage(v interface{}) error
- func (c *TaskContext) UnmarshalPayload() (map[string]interface{}, error)
- func (c *TaskContext) WithCookie(cookie interface{}) *TaskContext
- func (c *TaskContext) WithTimeout(timeout time.Duration) (*TaskContext, context.CancelFunc)
- type ThumbnailTrigger
- type Trigger
- type TriggerInfos
- func (t *TriggerInfos) Clone() couchdb.Doc
- func (t *TriggerInfos) DBCluster() int
- func (t *TriggerInfos) DBPrefix() string
- func (t *TriggerInfos) DocType() string
- func (t *TriggerInfos) DomainName() string
- func (t *TriggerInfos) Fetch(field string) []string
- func (t *TriggerInfos) ID() string
- func (t *TriggerInfos) IsKonnectorTrigger() bool
- func (t *TriggerInfos) JobRequest() *JobRequest
- func (t *TriggerInfos) JobRequestWithEvent(event *realtime.Event) (*JobRequest, error)
- func (t *TriggerInfos) Rev() string
- func (t *TriggerInfos) SetID(id string)
- func (t *TriggerInfos) SetRev(rev string)
- type TriggerState
- type WebhookTrigger
- func (w *WebhookTrigger) CombineRequest() string
- func (w *WebhookTrigger) Fire(payload Payload, manual bool)
- func (w *WebhookTrigger) Infos() *TriggerInfos
- func (w *WebhookTrigger) Schedule() <-chan *JobRequest
- func (w *WebhookTrigger) SetCallback(cb firer)
- func (w *WebhookTrigger) Type() string
- func (w *WebhookTrigger) Unschedule()
- type Worker
- type WorkerBeforeHook
- type WorkerCommit
- type WorkerConfig
- type WorkerFunc
- type WorkerInitFunc
- type WorkerStartFunc
- type WorkersList
Constants ¶
const DocTypeVersionTrigger = "1"
DocTypeVersionTrigger represents the doctype version. Each time this document structure is modified, update this value
const SchedKey = "scheduling"
SchedKey is the the key of the sorted set in redis used for triggers currently being executed
const TriggersKey = "triggers"
TriggersKey is the the key of the sorted set in redis used for triggers waiting to be activated
Variables ¶
var ( // ErrClosed is using a closed system ErrClosed = errors.New("jobs: closed") // ErrNotFoundJob is used when the job could not be found ErrNotFoundJob = errors.New("jobs: not found") // ErrQueueClosed is used to indicate the queue is closed ErrQueueClosed = errors.New("jobs: queue is closed") // ErrUnknownWorker the asked worker does not exist ErrUnknownWorker = errors.New("jobs: could not find worker") // ErrMessageNil is used for an nil message ErrMessageNil = errors.New("jobs: message is nil") // ErrMessageUnmarshal is used when unmarshalling a message causes an error ErrMessageUnmarshal = errors.New("jobs: message unmarshal") // ErrAbort can be used to abort the execution of the job without causing // errors. ErrAbort = errors.New("jobs: abort") // ErrUnknownTrigger is used when the trigger type is not recognized ErrUnknownTrigger = errors.New("Unknown trigger type") // ErrNotFoundTrigger is used when the trigger was not found ErrNotFoundTrigger = errors.New("Trigger with specified ID does not exist") // ErrMalformedTrigger is used to indicate the trigger is unparsable ErrMalformedTrigger = echo.NewHTTPError(http.StatusBadRequest, "Trigger unparsable") // ErrNotCronTrigger is used when a @cron trigger is expected, but it is // not the case ErrNotCronTrigger = errors.New("Invalid type for trigger (@cron expected)") )
Functions ¶
func AddWorker ¶
func AddWorker(conf *WorkerConfig)
AddWorker adds a new worker to global list of available workers.
func GetCounterTypeFromWorkerType ¶
func GetCounterTypeFromWorkerType(workerType string) (limits.CounterType, error)
GetCounterTypeFromWorkerType returns the CounterTypeFromWorkerType
func GetWorkersNamesList ¶
func GetWorkersNamesList() []string
GetWorkersNamesList returns the names of the configured workers
func SetRedisTimeoutForTest ¶
func SetRedisTimeoutForTest()
SetRedisTimeoutForTest is used by unit test to avoid waiting 10 seconds on cleanup.
func SystemStart ¶
func SystemStart(b Broker, s Scheduler, workersList WorkersList) error
SystemStart initializes and starts the global jobs system with the given broker, scheduler instances and workers list.
Types ¶
type AtTrigger ¶
type AtTrigger struct { *TriggerInfos // contains filtered or unexported fields }
AtTrigger implements the @at trigger type. It schedules a job at a specified time in the future.
func NewAtTrigger ¶
func NewAtTrigger(infos *TriggerInfos) (*AtTrigger, error)
NewAtTrigger returns a new instance of AtTrigger given the specified options.
func NewInTrigger ¶
func NewInTrigger(infos *TriggerInfos) (*AtTrigger, error)
NewInTrigger returns a new instance of AtTrigger given the specified options as @in.
func (*AtTrigger) CombineRequest ¶
CombineRequest implements the CombineRequest method of the Trigger interface.
func (*AtTrigger) Infos ¶
func (a *AtTrigger) Infos() *TriggerInfos
Infos implements the Infos method of the Trigger interface.
func (*AtTrigger) Schedule ¶
func (a *AtTrigger) Schedule() <-chan *JobRequest
Schedule implements the Schedule method of the Trigger interface.
func (*AtTrigger) Unschedule ¶
func (a *AtTrigger) Unschedule()
Unschedule implements the Unschedule method of the Trigger interface.
type BadTriggerError ¶
type BadTriggerError struct {
Err error
}
BadTriggerError is an error conveying the information of a trigger that is not valid, and could be deleted.
func (BadTriggerError) Error ¶
func (e BadTriggerError) Error() string
type Broker ¶
type Broker interface { StartWorkers(workersList WorkersList) error ShutdownWorkers(ctx context.Context) error // PushJob will push try to push a new job from the specified job request. // This method is asynchronous. PushJob(db prefixer.Prefixer, request *JobRequest) (*Job, error) // WorkerQueueLen returns the total element in the queue of the specified // worker type. WorkerQueueLen(workerType string) (int, error) // WorkerIsReserved returns true if the given worker type is reserved // (ie clients should not push jobs to it, only the stack). WorkerIsReserved(workerType string) (bool, error) // WorkersTypes returns the list of registered workers types. WorkersTypes() []string }
Broker interface is used to represent a job broker associated to a particular domain. A broker can be used to create jobs that are pushed in the job system.
This interface is matched by several implementations: - BrokerMock a mock implementation used for the tests.
func NewMemBroker ¶
func NewMemBroker() Broker
NewMemBroker creates a new in-memory broker system.
The in-memory implementation of the job system has the specifity that workers are actually launched by the broker at its creation.
func NewRedisBroker ¶
func NewRedisBroker(client redis.UniversalClient) Broker
NewRedisBroker creates a new broker that will use redis to distribute the jobs among several cozy-stack processes.
type BrokerMock ¶
BrokerMock is a mock implementation of Broker.
func NewBrokerMock ¶
func NewBrokerMock(t *testing.T) *BrokerMock
func (*BrokerMock) PushJob ¶
func (m *BrokerMock) PushJob(db prefixer.Prefixer, request *JobRequest) (*Job, error)
PushJob mock method.
func (*BrokerMock) ShutdownWorkers ¶
func (m *BrokerMock) ShutdownWorkers(ctx context.Context) error
ShutdownWorkers mock method.
func (*BrokerMock) StartWorkers ¶
func (m *BrokerMock) StartWorkers(workersList WorkersList) error
StartWorkers mock method.
func (*BrokerMock) WorkerIsReserved ¶
func (m *BrokerMock) WorkerIsReserved(workerType string) (bool, error)
WorkerIsReserved mock method.
func (*BrokerMock) WorkerQueueLen ¶
func (m *BrokerMock) WorkerQueueLen(workerType string) (int, error)
WorkerQueueLen mock method.
func (*BrokerMock) WorkersTypes ¶
func (m *BrokerMock) WorkersTypes() []string
WorkersTypes mock method.
type ClientTrigger ¶
type ClientTrigger struct {
*TriggerInfos
}
ClientTrigger implements the @webhook triggers. It schedules a job when an HTTP request is made at this webhook.
func NewClientTrigger ¶
func NewClientTrigger(infos *TriggerInfos) (*ClientTrigger, error)
NewClientTrigger returns a new instance of ClientTrigger.
func (*ClientTrigger) CombineRequest ¶
func (c *ClientTrigger) CombineRequest() string
CombineRequest implements the CombineRequest method of the Trigger interface.
func (*ClientTrigger) Infos ¶
func (c *ClientTrigger) Infos() *TriggerInfos
Infos implements the Infos method of the Trigger interface.
func (*ClientTrigger) Schedule ¶
func (c *ClientTrigger) Schedule() <-chan *JobRequest
Schedule implements the Schedule method of the Trigger interface.
func (*ClientTrigger) Type ¶
func (c *ClientTrigger) Type() string
Type implements the Type method of the Trigger interface.
func (*ClientTrigger) Unschedule ¶
func (c *ClientTrigger) Unschedule()
Unschedule implements the Unschedule method of the Trigger interface.
type CronTrigger ¶
type CronTrigger struct { *TriggerInfos // contains filtered or unexported fields }
CronTrigger implements the @cron trigger type. It schedules recurring jobs with the weird but very used Cron syntax.
func NewCronTrigger ¶
func NewCronTrigger(infos *TriggerInfos) (*CronTrigger, error)
NewCronTrigger returns a new instance of CronTrigger given the specified options.
func NewDailyTrigger ¶
func NewDailyTrigger(infos *TriggerInfos) (*CronTrigger, error)
NewDailyTrigger returns a new instance of CronTrigger given the specified options as @daily. It will take a random hour in the possible range to spread the triggers from the same app manifest.
func NewEveryTrigger ¶
func NewEveryTrigger(infos *TriggerInfos) (*CronTrigger, error)
NewEveryTrigger returns a new instance of CronTrigger given the specified options as @every.
func NewHourlyTrigger ¶
func NewHourlyTrigger(infos *TriggerInfos) (*CronTrigger, error)
NewHourlyTrigger returns a new instance of CronTrigger given the specified options as @hourly. It will take a random minute in the possible range to spread the triggers from the same app manifest.
func NewMonthlyTrigger ¶
func NewMonthlyTrigger(infos *TriggerInfos) (*CronTrigger, error)
NewMonthlyTrigger returns a new instance of CronTrigger given the specified options as @monthly. It will take a random day/hour in the possible range to spread the triggers from the same app manifest.
func NewWeeklyTrigger ¶
func NewWeeklyTrigger(infos *TriggerInfos) (*CronTrigger, error)
NewWeeklyTrigger returns a new instance of CronTrigger given the specified options as @weekly. It will take a random day/hour in the possible range to spread the triggers from the same app manifest.
func (*CronTrigger) CombineRequest ¶
func (c *CronTrigger) CombineRequest() string
CombineRequest implements the CombineRequest method of the Trigger interface.
func (*CronTrigger) Infos ¶
func (c *CronTrigger) Infos() *TriggerInfos
Infos implements the Infos method of the Trigger interface.
func (*CronTrigger) NextExecution ¶
func (c *CronTrigger) NextExecution(last time.Time) time.Time
NextExecution returns the next time when a job should be fired for this trigger
func (*CronTrigger) Schedule ¶
func (c *CronTrigger) Schedule() <-chan *JobRequest
Schedule implements the Schedule method of the Trigger interface.
func (*CronTrigger) Type ¶
func (c *CronTrigger) Type() string
Type implements the Type method of the Trigger interface.
func (*CronTrigger) Unschedule ¶
func (c *CronTrigger) Unschedule()
Unschedule implements the Unschedule method of the Trigger interface.
type DumpFilePather ¶
type DumpFilePather struct{}
DumpFilePather is a struct made for calling the Path method of a FileDoc and relying on the cached fullpath of this document (not trying to rebuild it)
type Event ¶
type Event json.RawMessage
Event is a json encoded value of a realtime.Event.
type EventTrigger ¶
type EventTrigger struct { *TriggerInfos // contains filtered or unexported fields }
EventTrigger implements Trigger for realtime triggered events
func NewEventTrigger ¶
func NewEventTrigger(infos *TriggerInfos) (*EventTrigger, error)
NewEventTrigger returns a new instance of EventTrigger given the specified options.
func (*EventTrigger) CombineRequest ¶
func (t *EventTrigger) CombineRequest() string
CombineRequest implements the CombineRequest method of the Trigger interface.
func (*EventTrigger) Infos ¶
func (t *EventTrigger) Infos() *TriggerInfos
Infos implements the Infos method of the Trigger interface.
func (*EventTrigger) Schedule ¶
func (t *EventTrigger) Schedule() <-chan *JobRequest
Schedule implements the Schedule method of the Trigger interface.
func (*EventTrigger) Type ¶
func (t *EventTrigger) Type() string
Type implements the Type method of the Trigger interface.
func (*EventTrigger) Unschedule ¶
func (t *EventTrigger) Unschedule()
Unschedule implements the Unschedule method of the Trigger interface.
type FrequencyKind ¶
type FrequencyKind int
FrequencyKind is used to tell if a periodic trigger is weekly or monthly.
const ( MonthlyKind FrequencyKind = iota WeeklyKind DailyKind HourlyKind )
type Job ¶
type Job struct { JobID string `json:"_id,omitempty"` JobRev string `json:"_rev,omitempty"` Cluster int `json:"couch_cluster,omitempty"` Domain string `json:"domain"` Prefix string `json:"prefix,omitempty"` WorkerType string `json:"worker"` TriggerID string `json:"trigger_id,omitempty"` Message Message `json:"message"` Event Event `json:"event"` Payload Payload `json:"payload,omitempty"` Manual bool `json:"manual_execution,omitempty"` Debounced bool `json:"debounced,omitempty"` Options *JobOptions `json:"options,omitempty"` State State `json:"state"` QueuedAt time.Time `json:"queued_at"` StartedAt time.Time `json:"started_at"` FinishedAt time.Time `json:"finished_at"` Error string `json:"error,omitempty"` ForwardLogs bool `json:"forward_logs,omitempty"` }
Job contains all the metadata informations of a Job. It can be marshalled in JSON.
func FilterByWorkerAndState ¶
FilterByWorkerAndState filters a job slice by its workerType and State
func FilterJobsBeforeDate ¶
FilterJobsBeforeDate returns alls jobs queued before the specified date
func GetAllJobs ¶
GetAllJobs returns the list of all the jobs on the given instance.
func GetLastsJobs ¶
GetLastsJobs returns the N lasts job of each state for an instance/worker type pair
func GetQueuedJobs ¶
GetQueuedJobs returns the list of jobs which states is "queued" or "running"
func NewJob ¶
func NewJob(db prefixer.Prefixer, req *JobRequest) *Job
NewJob creates a new Job instance from a job request.
func (*Job) AckConsumed ¶
AckConsumed sets the job infos state to Running an sends the new job infos on the channel.
func (*Job) DomainName ¶
DomainName implements the prefixer.Prefixer interface.
type JobErrorCheckerHook ¶
JobErrorCheckerHook is an optional method called at the beginning of the job execution to prevent a retry according to the previous error (specifically useful in the retries loop)
type JobOptions ¶
type JobOptions struct { MaxExecCount int `json:"max_exec_count"` Timeout time.Duration `json:"timeout"` }
JobOptions struct contains the execution properties of the jobs.
type JobRequest ¶
type JobRequest struct { WorkerType string TriggerID string Trigger Trigger Message Message Event Event Payload Payload Manual bool Debounced bool ForwardLogs bool Options *JobOptions }
JobRequest struct is used to represent a new job request.
func (*JobRequest) DocType ¶
func (jr *JobRequest) DocType() string
DocType implements the permission.Getter interface
func (*JobRequest) Fetch ¶
func (jr *JobRequest) Fetch(field string) []string
Fetch implements the permission.Fetcher interface
func (*JobRequest) ID ¶
func (jr *JobRequest) ID() string
ID implements the permission.Getter interface
type JobSystem ¶
type JobSystem interface { Broker Scheduler utils.Shutdowner }
JobSystem is a pair of broker, scheduler linked together.
type Message ¶
type Message json.RawMessage
Message is a json encoded job message.
func NewMessage ¶
NewMessage returns a json encoded data
func (Message) MarshalJSON ¶
MarshalJSON implements json.Marshaler on Message.
func (Message) Unmarshal ¶
Unmarshal can be used to unmarshal the encoded message value in the specified interface's type.
func (*Message) UnmarshalJSON ¶
UnmarshalJSON implements json.Unmarshaler on Message. It should be retro- compatible with the old Message representation { Data, Type }.
type PeriodicParser ¶
type PeriodicParser struct{}
PeriodicParser can be used to parse @weekly and @monthly trigger arguments. It can parse a string like "on monday between 8am and 6pm".
func NewPeriodicParser ¶
func NewPeriodicParser() PeriodicParser
NewPeriodicParser creates a PeriodicParser.
func (*PeriodicParser) Parse ¶
func (p *PeriodicParser) Parse(frequency FrequencyKind, periodic string) (*PeriodicSpec, error)
Parse will transform a string like "on monday" to a PeriodicSpec, or will return an error if the format is not supported.
type PeriodicSpec ¶
type PeriodicSpec struct { Frequency FrequencyKind DaysOfMonth []int // empty for *, or a slice of acceptable days (1 to 31) DaysOfWeek []int // a slice of acceptable days, from 0 for sunday to 6 for saturday AfterHour int // an hour between 0 and 23 BeforeHour int // an hour between 1 and 24 }
PeriodicSpec is the result of a successful parsing
func NewPeriodicSpec ¶
func NewPeriodicSpec() *PeriodicSpec
func (*PeriodicSpec) ToRandomCrontab ¶
func (s *PeriodicSpec) ToRandomCrontab(seed string) string
ToRandomCrontab generates a crontab that verifies the PeriodicSpec. The values are taken randomly, and the random generator uses the given seed to allow stability for a trigger, ie a weekly trigger must always run on the same day at the same hour.
type Scheduler ¶
type Scheduler interface { StartScheduler(broker Broker) error ShutdownScheduler(ctx context.Context) error PollScheduler(now int64) error AddTrigger(trigger Trigger) error GetTrigger(db prefixer.Prefixer, id string) (Trigger, error) UpdateMessage(db prefixer.Prefixer, trigger Trigger, message json.RawMessage) error UpdateCron(db prefixer.Prefixer, trigger Trigger, arguments string) error DeleteTrigger(db prefixer.Prefixer, id string) error GetAllTriggers(db prefixer.Prefixer) ([]Trigger, error) HasTrigger(db prefixer.Prefixer, infos TriggerInfos) bool CleanRedis() error RebuildRedis(db prefixer.Prefixer) error }
Scheduler interface is used to represent a scheduler that is responsible to listen respond to triggers jobs requests and send them to the broker.
func NewMemScheduler ¶
func NewMemScheduler() Scheduler
NewMemScheduler creates a new in-memory scheduler that will load all registered triggers and schedule their work.
func NewRedisScheduler ¶
func NewRedisScheduler(client redis.UniversalClient) Scheduler
NewRedisScheduler creates a new scheduler that use redis to synchronize with other cozy-stack processes to schedule jobs.
type ShareGroupMessage ¶
type ShareGroupMessage struct {}
ShareGroupMessage is used for jobs on the share-group worker.
type ShareGroupTrigger ¶
type ShareGroupTrigger struct {
// contains filtered or unexported fields
}
func NewShareGroupTrigger ¶
func NewShareGroupTrigger(broker Broker) *ShareGroupTrigger
func (*ShareGroupTrigger) Schedule ¶
func (t *ShareGroupTrigger) Schedule()
func (*ShareGroupTrigger) Unschedule ¶
func (t *ShareGroupTrigger) Unschedule()
type TaskContext ¶
type TaskContext struct { context.Context Instance *instance.Instance // contains filtered or unexported fields }
TaskContext is a context.Context passed to the worker for each task execution and contains specific values from the job.
func NewTaskContext ¶
func NewTaskContext(workerID string, job *Job, inst *instance.Instance) (*TaskContext, context.CancelFunc)
NewTaskContext returns a context.Context usable by a worker.
func (*TaskContext) Cookie ¶
func (c *TaskContext) Cookie() interface{}
Cookie returns the cookie associated with the worker context.
func (*TaskContext) ID ¶
func (c *TaskContext) ID() string
ID returns a unique identifier for the worker context.
func (*TaskContext) Logger ¶
func (c *TaskContext) Logger() logger.Logger
Logger return the logger associated with the worker context.
func (*TaskContext) Manual ¶
func (c *TaskContext) Manual() bool
Manual returns if the job was started manually
func (*TaskContext) NoRetry ¶
func (c *TaskContext) NoRetry() bool
NoRetry returns the no-retry flag.
func (*TaskContext) SetNoRetry ¶
func (c *TaskContext) SetNoRetry()
SetNoRetry set the no-retry flag to prevent a retry on the next execution.
func (*TaskContext) TriggerID ¶
func (c *TaskContext) TriggerID() (string, bool)
TriggerID returns the possible trigger identifier responsible for launching the job.
func (*TaskContext) UnmarshalEvent ¶
func (c *TaskContext) UnmarshalEvent(v interface{}) error
UnmarshalEvent unmarshals the event contained in the worker context.
func (*TaskContext) UnmarshalMessage ¶
func (c *TaskContext) UnmarshalMessage(v interface{}) error
UnmarshalMessage unmarshals the message contained in the worker context.
func (*TaskContext) UnmarshalPayload ¶
func (c *TaskContext) UnmarshalPayload() (map[string]interface{}, error)
UnmarshalPayload unmarshals the payload contained in the worker context.
func (*TaskContext) WithCookie ¶
func (c *TaskContext) WithCookie(cookie interface{}) *TaskContext
WithCookie returns a clone of the context with a new cookie value.
func (*TaskContext) WithTimeout ¶
func (c *TaskContext) WithTimeout(timeout time.Duration) (*TaskContext, context.CancelFunc)
WithTimeout returns a clone of the context with a different deadline.
type ThumbnailTrigger ¶
type ThumbnailTrigger struct {
// contains filtered or unexported fields
}
func NewThumbnailTrigger ¶
func NewThumbnailTrigger(broker Broker) *ThumbnailTrigger
func (*ThumbnailTrigger) Schedule ¶
func (t *ThumbnailTrigger) Schedule()
func (*ThumbnailTrigger) Unschedule ¶
func (t *ThumbnailTrigger) Unschedule()
type Trigger ¶
type Trigger interface { prefixer.Prefixer permission.Fetcher Type() string Infos() *TriggerInfos // Schedule should return a channel on which the trigger can send job // requests when it decides to. Schedule() <-chan *JobRequest // Unschedule should be used to clean the trigger states and should close // the returns jobs channel. Unschedule() CombineRequest() string }
Trigger interface is used to represent a trigger.
func NewTrigger ¶
func NewTrigger(db prefixer.Prefixer, infos TriggerInfos, data interface{}) (Trigger, error)
NewTrigger creates the trigger associates with the specified trigger options.
type TriggerInfos ¶
type TriggerInfos struct { TID string `json:"_id,omitempty"` TRev string `json:"_rev,omitempty"` Cluster int `json:"couch_cluster,omitempty"` Domain string `json:"domain"` Prefix string `json:"prefix,omitempty"` Type string `json:"type"` WorkerType string `json:"worker"` Arguments string `json:"arguments"` Debounce string `json:"debounce"` Options *JobOptions `json:"options"` Message Message `json:"message"` CurrentState *TriggerState `json:"current_state,omitempty"` Metadata *metadata.CozyMetadata `json:"cozyMetadata,omitempty"` }
TriggerInfos is a struct containing all the options of a trigger.
func (*TriggerInfos) Clone ¶
func (t *TriggerInfos) Clone() couchdb.Doc
Clone implements the couchdb.Doc interface
func (*TriggerInfos) DBCluster ¶
func (t *TriggerInfos) DBCluster() int
DBCluster implements the prefixer.Prefixer interface.
func (*TriggerInfos) DBPrefix ¶
func (t *TriggerInfos) DBPrefix() string
DBPrefix implements the prefixer.Prefixer interface.
func (*TriggerInfos) DocType ¶
func (t *TriggerInfos) DocType() string
DocType implements the couchdb.Doc interface
func (*TriggerInfos) DomainName ¶
func (t *TriggerInfos) DomainName() string
DomainName implements the prefixer.Prefixer interface.
func (*TriggerInfos) Fetch ¶
func (t *TriggerInfos) Fetch(field string) []string
Fetch implements the permission.Fetcher interface
func (*TriggerInfos) ID ¶
func (t *TriggerInfos) ID() string
ID implements the couchdb.Doc interface
func (*TriggerInfos) IsKonnectorTrigger ¶
func (t *TriggerInfos) IsKonnectorTrigger() bool
func (*TriggerInfos) JobRequest ¶
func (t *TriggerInfos) JobRequest() *JobRequest
JobRequest returns a job request associated with the scheduler informations.
func (*TriggerInfos) JobRequestWithEvent ¶
func (t *TriggerInfos) JobRequestWithEvent(event *realtime.Event) (*JobRequest, error)
JobRequestWithEvent returns a job request associated with the scheduler informations associated to the specified realtime event.
func (*TriggerInfos) Rev ¶
func (t *TriggerInfos) Rev() string
Rev implements the couchdb.Doc interface
func (*TriggerInfos) SetID ¶
func (t *TriggerInfos) SetID(id string)
SetID implements the couchdb.Doc interface
func (*TriggerInfos) SetRev ¶
func (t *TriggerInfos) SetRev(rev string)
SetRev implements the couchdb.Doc interface
type TriggerState ¶
type TriggerState struct { TID string `json:"trigger_id"` Status State `json:"status"` LastSuccess *time.Time `json:"last_success,omitempty"` LastSuccessfulJobID string `json:"last_successful_job_id,omitempty"` LastExecution *time.Time `json:"last_execution,omitempty"` LastExecutedJobID string `json:"last_executed_job_id,omitempty"` LastFailure *time.Time `json:"last_failure,omitempty"` LastFailedJobID string `json:"last_failed_job_id,omitempty"` LastError string `json:"last_error,omitempty"` LastManualExecution *time.Time `json:"last_manual_execution,omitempty"` LastManualJobID string `json:"last_manual_job_id,omitempty"` }
TriggerState represent the current state of the trigger
func GetTriggerState ¶
func GetTriggerState(db prefixer.Prefixer, triggerID string) (*TriggerState, error)
GetTriggerState returns the state of the trigger, calculated from the last launched jobs.
type WebhookTrigger ¶
type WebhookTrigger struct { *TriggerInfos // contains filtered or unexported fields }
WebhookTrigger implements the @webhook triggers. It schedules a job when an HTTP request is made at this webhook.
func NewWebhookTrigger ¶
func NewWebhookTrigger(infos *TriggerInfos) (*WebhookTrigger, error)
NewWebhookTrigger returns a new instance of WebhookTrigger.
func (*WebhookTrigger) CombineRequest ¶
func (w *WebhookTrigger) CombineRequest() string
CombineRequest implements the CombineRequest method of the Trigger interface.
func (*WebhookTrigger) Fire ¶
func (w *WebhookTrigger) Fire(payload Payload, manual bool)
Fire is called with a payload when the webhook has been requested.
func (*WebhookTrigger) Infos ¶
func (w *WebhookTrigger) Infos() *TriggerInfos
Infos implements the Infos method of the Trigger interface.
func (*WebhookTrigger) Schedule ¶
func (w *WebhookTrigger) Schedule() <-chan *JobRequest
Schedule implements the Schedule method of the Trigger interface.
func (*WebhookTrigger) SetCallback ¶
func (w *WebhookTrigger) SetCallback(cb firer)
SetCallback registers a struct to be called when the webhook is fired.
func (*WebhookTrigger) Type ¶
func (w *WebhookTrigger) Type() string
Type implements the Type method of the Trigger interface.
func (*WebhookTrigger) Unschedule ¶
func (w *WebhookTrigger) Unschedule()
Unschedule implements the Unschedule method of the Trigger interface.
type Worker ¶
type Worker struct { Type string Conf *WorkerConfig // contains filtered or unexported fields }
Worker is a unit of work that will consume from a queue and execute the do method for each jobs it pulls.
func NewWorker ¶
func NewWorker(conf *WorkerConfig) *Worker
NewWorker creates a new instance of Worker with the given configuration.
type WorkerBeforeHook ¶
WorkerBeforeHook is an optional method that is always called before the job is being pushed into the queue. It can be useful to skip the job beforehand.
type WorkerCommit ¶
type WorkerCommit func(ctx *TaskContext, errjob error) error
WorkerCommit is an optional method that is always called once after the execution of the WorkerFunc.
type WorkerConfig ¶
type WorkerConfig struct { WorkerInit WorkerInitFunc WorkerStart WorkerStartFunc WorkerFunc WorkerFunc WorkerCommit WorkerCommit WorkerType string BeforeHook WorkerBeforeHook ErrorHook JobErrorCheckerHook Concurrency int MaxExecCount int Reserved bool // true when the clients must not push jobs for this worker Timeout time.Duration RetryDelay time.Duration }
WorkerConfig is the configuration parameter of a worker defined by the job system. It contains parameters of the worker along with the worker main function that perform the work against a job's message.
func GetWorkersList ¶
func GetWorkersList() ([]*WorkerConfig, error)
GetWorkersList returns a list of all activated workers, configured as defined by the configuration file.
func (*WorkerConfig) Clone ¶
func (w *WorkerConfig) Clone() *WorkerConfig
Clone clones the worker config
type WorkerFunc ¶
type WorkerFunc func(ctx *TaskContext) error
WorkerFunc represent the work function that a worker should implement.
type WorkerInitFunc ¶
type WorkerInitFunc func() error
WorkerInitFunc is called at the start of the worker system, only once. It is not called before every job process. It can be useful to initialize a global variable used by the worker.
type WorkerStartFunc ¶
type WorkerStartFunc func(ctx *TaskContext) (*TaskContext, error)
WorkerStartFunc is optionally called at the beginning of the each job process and can produce a context value.
type WorkersList ¶
type WorkersList []*WorkerConfig
WorkersList is a map associating a worker type with its acutal configuration.
Source Files ¶
- broker.go
- broker_mock.go
- errors.go
- globals.go
- mem_broker.go
- mem_scheduler.go
- metrics.go
- periodic_parser.go
- rate_limiting.go
- redis_broker.go
- redis_scheduler.go
- scheduler.go
- trigger_at.go
- trigger_client.go
- trigger_cron.go
- trigger_event.go
- trigger_share_group.go
- trigger_thumbnail.go
- trigger_webhook.go
- worker.go
- workers_list.go