Documentation ¶
Overview ¶
Package occamy ... TODO: Complete package description.
Index ¶
- Constants
- type BasicError
- type DetailedError
- type ErrorMonitor
- type Handler
- type Headers
- type LatencyMonitor
- type Message
- type Monitors
- type NopErrorMonitor
- type NopLatencyMonitor
- type NopResourceMonitor
- type ResourceMonitor
- type Server
- type ServerConfig
- type SlotStatus
- type Task
- type TaskDetails
- type WrappedError
Constants ¶
const ( ProcessHandleRequestMessage = "handle_request_message" ProcessHandleControlMessage = "handle_control_message" ProcessExpansion = "expansion_process" )
const (
TaskGroupNone string = ""
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BasicError ¶
type BasicError string
const ( // ErrInternalHandlerError covers situations where the // handler encounters an internal error. ErrInternalHandlerError BasicError = "internal_handler_error" // ErrTaskInterrupted is exclusively for when a task // (generated by a handler) is interrupted by the occamy // server through the context being cancelled. ErrTaskInterrupted BasicError = "task_interrupted" // ErrInvalidBody covers situations where a message // was received with invalid bodies. ErrInvalidBody BasicError = "invalid_body" // ErrInvalidHeader covers situations where a message // was received with invalid headers. ErrInvalidHeader BasicError = "invalid_headers" // ErrInvalidTask covers situations where the task // requested is invalid. ErrInvalidTask BasicError = "invalid_task" // ErrMessageNotAcked is for when the ack method // fails for a message. ErrMessageNotAcked BasicError = "message_not_acked" // ErrMessageNotNacked is for when the nack method // fails for a message. ErrMessageNotNacked BasicError = "message_not_nacked" // ErrTaskNotAdded is exclusively for when a task // (generated by a handler) could not be added. ErrTaskNotAdded BasicError = "task_not_added" // ErrTaskNotKilled is exclusively for when a task // (generated by a handler) could not be killed. ErrTaskNotKilled BasicError = "task_not_killed" )
func (BasicError) Error ¶
func (e BasicError) Error() string
type DetailedError ¶
type DetailedError struct { BasicErr BasicError Cause string }
func NewDetailedError ¶
func NewDetailedError(err BasicError, cause string) *DetailedError
func (*DetailedError) Error ¶
func (w *DetailedError) Error() string
type ErrorMonitor ¶
type ErrorMonitor interface {
RecordError(err error)
}
type Handler ¶
Handle is a method that takes the header and body of a message and generates a task that must be completed. The header should contain relevant details about the task type and how to decode the message. The header will always contain cid in the field "X-Request-ID". The error returned should be one of the custom occamy errors.
type Headers ¶
type Headers map[string]interface{}
Headers contain the header information of a message.
type LatencyMonitor ¶
type Message ¶
type Message interface { // Body must return the body of the message. Body() []byte // Headers must return the headers of the message. Headers() Headers // Ack must acknowledge the messages as successfully handled and does not // need to be passed to another server. Ack() error // Reject must negative acknowledge the message as uncompleted. The requeue // parameter will determine if the message should be requeued and passed to // another server. // // For example, if a server shuts down the task associated with a message // will not be completed and will be passed to another server. On the other // hand if Reject(requeue bool) error }
Message represents an asynchronous message.
type Monitors ¶
type Monitors struct { Error ErrorMonitor Latency LatencyMonitor Resource ResourceMonitor }
type NopErrorMonitor ¶
type NopErrorMonitor struct{}
func (NopErrorMonitor) RecordError ¶
func (NopErrorMonitor) RecordError(_ error)
type NopLatencyMonitor ¶
type NopLatencyMonitor struct{}
func (NopLatencyMonitor) RecordProcessDuration ¶
func (NopLatencyMonitor) RecordProcessDuration(_ string, _ time.Duration)
func (NopLatencyMonitor) RecordTaskDuration ¶
func (NopLatencyMonitor) RecordTaskDuration(_ string, _ SlotStatus, _ time.Duration)
type NopResourceMonitor ¶
type NopResourceMonitor struct{}
func (NopResourceMonitor) RecordTaskStarting ¶
func (NopResourceMonitor) RecordTaskStarting(_ string, _ SlotStatus)
func (NopResourceMonitor) RecordTaskStopping ¶
func (NopResourceMonitor) RecordTaskStopping(_ string, _ SlotStatus)
type ResourceMonitor ¶
type ResourceMonitor interface { RecordTaskStarting(group string, status SlotStatus) RecordTaskStopping(group string, status SlotStatus) }
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is a server that handles incoming messages and passes them to the handler (only one is allowed). The task associated with the messages are performed and when possible spare resources are used by the handler for additional tasks. The server must always be created using the NewServer method.
func NewServer ¶
func NewServer(config ServerConfig) *Server
NewServer creates a new occamy server based using the configuration provided. The expansion process is automatically started.
func (*Server) ExpandTasks ¶ added in v0.3.0
func (server *Server) ExpandTasks()
ExpandTasks calls expand on running tasks and runs externally added tasks provided there is sufficient space.
It is generally recommended have the server using a periodic expansion. This method has been included so that custom expansion schedules can be performed.
func (*Server) HandleControlMsg ¶
HandleControlMsg handles a control method.
Control messages with a task ID set in the header will be passed on to ALL tasks which have matching IDs. If there is no task ID set then the message will be interpreted as an optional external request and stored to be used in the expansion process.
func (*Server) HandleRequestMsg ¶
HandleRequestMsg handles an incoming request message.
The handler defined in the server is used to generate a task. The task should be started immediately, unless an error is encountered in which case the message will be nacked.
type ServerConfig ¶
type ServerConfig struct { // Slots sets how many slots will be crated i.e. the maximum number of // simultaneous tasks allowed. Slots int // ExpansionSlotBuffer sets how many ExpansionSlotBuffer int // ExpansionPeriod sets how often the expansion process will be run. A value // of zero or less will mean that the expansion process will never be run. ExpansionPeriod time.Duration // KillTimeout sets the maximum duration allowed for task a task to be // killed in. This is important to set for allowing unprotected tasks // to be killed and replaced by protected ones. KillTimeout time.Duration // HeaderKeyTaskID sets the header key used to obtain the task ID for any // control message received. In other words the value in the header // corresponding to this key will be treated as the task ID. HeaderKeyTaskID string Handler Handler Monitors Monitors }
ServerConfig contains the necessary data to create an occamy server.
type SlotStatus ¶
type SlotStatus string
const ( SlotStatusEmpty SlotStatus = "empty" SlotStatusProtected SlotStatus = "protected" SlotStatusUnprotectedInternal SlotStatus = "unprotected_internal" SlotStatusUnprotectedExternal SlotStatus = "unprotected_external" )
type Task ¶
type Task interface { // Do should perform the task and if a failure is encountered // one of the custom occamy errors should be returned. // // If the context provided is canceled the process must be stopped // immediately and ungracefully, and return the error // `ErrTaskInterrupted` which will cause the task to be run on // another server. Do(ctx context.Context) error Details() TaskDetails // Expand creates additional tasks. These tasks will be unprotected // and maybe cancelled at anytime. Expand(n int) []Task // Handle handles a control message. The error return should be // one of the custom occamy errors. Handle(ctx context.Context, headers Headers, body []byte) error }
Task represents a task that must be done.
type TaskDetails ¶
type TaskDetails struct { // Deadline is the deadline of the task, if it is known, // and is only relevant for tasks that that arrived via // the control channel. Deadline time.Time // The ID of the the task. Any control messages // for this task must have this ID included in the header. ID string // Group denotes the general group that task this belongs to. This // is used for monitoring usage. Group string }
type WrappedError ¶
type WrappedError struct { BasicErr BasicError InnerErr error }
func NewWrappedError ¶
func NewWrappedError(basicErr BasicError, innerErr error) *WrappedError
func (*WrappedError) Error ¶
func (w *WrappedError) Error() string