Documentation ¶
Overview ¶
Package queue contain methods and structures for managing of the message queue
Index ¶
Constants ¶
const ( ProcessedSuccessful = iota ProcessedWithError ProcessedWaitNext ProcessedKillWorker )
Results of the execution of the worker
Variables ¶
var DefaultQueueOptions = Options{ MinimunWorkersCount: 4, MaximumWorkersCount: 32, StorageOptions: nil, MaximumMessagesPerWorker: 2048, InputTimeOut: 5 * time.Second, MaximumMessagesInQueue: 2048, MaximumQueueMessagesSize: 16 * 1024 * 1024, }
DefaultQueueOptions is default options for queue
var DefaultStorageOptions = StorageOptions{ MaxDataFileSize: 0x1FFFFFFF, FlushOperations: 512, PercentFreeForRecalculateOnExit: 5, PercentFreeForRecalculateOnIncrementIndexFile: 10, SkipReturnedRecords: true, SkipDelayPerTry: 500, CheckCRCOnRead: false, MaxOneTimeOpenedFiles: 12, DeleteInvalidIndexFile: true, }
DefaultStorageOptions is default options for filestorage
Functions ¶
This section is empty.
Types ¶
type Logging ¶
type Logging interface { Trace(msg string, a ...interface{}) Info(msg string, a ...interface{}) Warning(msg string, a ...interface{}) Error(msg string, a ...interface{}) }
Logging is the interface that must support logging system for work with queue
type Options ¶
type Options struct { // Options for file storage connected to this queue StorageOptions *StorageOptions // In the during of timeout, message must be processed or saved to disk InputTimeOut time.Duration // Maximum size of the messages what can be processed without storing to disk MaximumQueueMessagesSize int32 // Maximum count of the messages what can be processed without storing to disk MaximumMessagesInQueue uint16 // Minimum count of the workers per queue MinimunWorkersCount uint16 // Minimum count of the workers per queue MaximumWorkersCount uint16 // Maximum count of the messages that thw worker can crocess per one time MaximumMessagesPerWorker uint16 }
Options holds the optional parameters for the managing of the messages.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is a base structure for managing of the messages
func CreateQueue ¶
func CreateQueue(Name, StoragePath string, Log Logging, Factory WorkerFactory, Options *Options) (*Queue, error)
CreateQueue is function than creates and inits internal states :
func (*Queue) Close ¶
func (q *Queue) Close()
Close stops the handler of the messages, saves the messages located in the memory into the disk, closes all opened files.
func (*Queue) Insert ¶
Insert appends the message into the queue. In depends of the timeout's option either is trying to write message to the disk or is trying to process this message in the memory and writing to the disk only if timeout is expired shortly. Returns false if aren't processing / writing of the message in the during of the timeout or has some problems with writing to disk
func (*Queue) InsertFile ¶
InsertFile appends file to queue. After processing content of the file if result of the execution of the worker is successful file will deleted.
type QueueItem ¶
type QueueItem struct { ID StorageIdx Stream io.ReadSeeker // contains filtered or unexported fields }
QueueItem is elementh of the queue
type QueueWorkerFactory ¶
type StorageIdx ¶
type StorageIdx uint64
StorageIdx is unique identifier of the message in the memory or on the disk
const InvalidIdx StorageIdx = 0xFFFFFFFFFFFFFFFF
InvalidIdx id Invalid index description
type StorageOptions ¶
type StorageOptions struct { // maximum size of the storage's data files MaxDataFileSize int64 // Count of the operation with storage when index file will be flushed FlushOperations uint32 // Count of the percents if the free messages before close of the when index file will be reformed PercentFreeForRecalculateOnExit uint8 // Count of the percents if the free messages when index file will be reformed PercentFreeForRecalculateOnIncrementIndexFile uint8 // Depends skip error messages if timeout of the waiting did not finished yet SkipReturnedRecords bool // Duration of the timeout. Time of the next processing calculated by TimeOfError+CountOfTheErrors*SkipDelayPerTry SkipDelayPerTry uint32 // Depends check crc of the message before sent in to worker CheckCRCOnRead bool // Count of the one time opened for reading and for writing files. Open files are counting separately MaxOneTimeOpenedFiles int16 // If queue index file is corrupted then will recreate index file and try to restore ,essages information DeleteInvalidIndexFile bool }
StorageOptions holds the optional parameters for the disk storage of the messages.
type Worker ¶
type Worker interface { // Processes message that is stored in `*Message`. // After it the worker must call function `(*Queue).Process` with his unique identifier // and with result of the processing, also must be pushed himself into chanal `Worker` ProcessMessage(*QueueItem) int // Processing of the event when available messages is absent // After it the worker must call function `(*Queue).Process` with his unique identifier and // with result of the processing, also must send himself into chanal `Worker` ProcessTimeout() int // Returns unique identifier of the worker GetID() WorkerID // Close is called when queue is finishing work with worker. Here you can close connection to database or etc. Close() }
Worker is interface that allow to structure to processing outgoing message
type WorkerFactory ¶
type WorkerFactory interface { // Creates new worker for this factory with unique ID CreateWorker() (Worker, error) // Returns true if possible used some messages in one action (for example, // collect large SQL script from lot of the small messages) NeedTimeoutProcessing() bool CanCreateWorkers() bool Close() }
WorkerFactory is interface for creating new workers