Documentation ¶
Overview ¶
Package mq implements interprocess queues logic. It provides access to system mq mechanisms, such as sysv mq and linux mq. Also, it provides access to multi-platform priority queue, FastMq.
Index ¶
- Constants
- func Destroy(name string) error
- func DestroyFastMq(name string) error
- func IsTemporary(err error) bool
- type Blocker
- type Buffered
- type FastMq
- func (mq *FastMq) Cap() (int, error)
- func (mq *FastMq) Close() error
- func (mq *FastMq) Destroy() error
- func (mq *FastMq) Receive(data []byte) error
- func (mq *FastMq) ReceivePriority(data []byte) (int, error)
- func (mq *FastMq) ReceivePriorityTimeout(data []byte, timeout time.Duration) (int, error)
- func (mq *FastMq) Send(data []byte) error
- func (mq *FastMq) SendPriority(data []byte, prio int) error
- func (mq *FastMq) SendPriorityTimeout(data []byte, prio int, timeout time.Duration) error
- func (mq *FastMq) SetBlocking(block bool) error
- type Messenger
- type PriorityMessenger
- type TimedMessenger
Constants ¶
const ( // DefaultFastMqMaxSize is the default fast mq queue size. DefaultFastMqMaxSize = 8 // DefaultFastMqMessageSize is the fast mq message size. DefaultFastMqMessageSize = 8192 )
const ( // O_NONBLOCK flag makes mq read/write operations nonblocking. O_NONBLOCK = common.O_NONBLOCK )
Variables ¶
This section is empty.
Functions ¶
func DestroyFastMq ¶
DestroyFastMq permanently removes a FastMq.
func IsTemporary ¶
IsTemporary returns true, if an error is a timeout error.
Types ¶
type FastMq ¶
type FastMq struct {
// contains filtered or unexported fields
}
FastMq is a priority message queue based on shared memory. It does not support blocking send/receieve yet, and will panic, if opened without O_NONBLOCK. Currently it is the only implementation for windows. It'll become default implementation for all platforms, when bloking mode support is added.
func CreateFastMq ¶
func CreateFastMq(name string, flag int, perm os.FileMode, maxQueueSize, maxMsgSize int) (*FastMq, error)
CreateFastMq creates new FastMq.
name - mq name. implementation will create a shm object with this name. flag - flag is a combination of os.O_EXCL, and O_NONBLOCK. perm - object's permission bits. maxQueueSize - queue capacity. maxMsgSize - maximum message size.
func OpenFastMq ¶
OpenFastMq opens an existing message queue. It returns an error, if it does not exist.
name - unique mq name. flag - 0 or O_NONBLOCK.
func (*FastMq) ReceivePriority ¶
ReceivePriority receives a message and returns its priority. It blocks if the queue is empty.
func (*FastMq) ReceivePriorityTimeout ¶
ReceivePriorityTimeout receives a message and returns its priority. It blocks if the queue is empty, waiting for not longer, then the timeout.
func (*FastMq) SendPriority ¶
SendPriority sends a message with the given priority. It blocks if the queue is full.
func (*FastMq) SendPriorityTimeout ¶
SendPriorityTimeout sends a message with the given priority. It blocks if the queue is full, waiting for not longer, then the timeout.
func (*FastMq) SetBlocking ¶
SetBlocking sets whether the send/receive operations on the queue block. This applies to the current instance only.
type Messenger ¶
type Messenger interface { // Send sends the data. It blocks if there are no readers and the queue if full Send(data []byte) error // Receive reads data from the queue. It blocks if the queue is empty Receive(data []byte) error io.Closer }
Messenger is an interface which must be satisfied by any message queue implementation on any platform.
func New ¶
New creates a mq with a given name and permissions. It uses the default implementation. If there are several implementations on a platform, you can use explicit create functions.
name - unique queue name. flag - create flags. You can specify: os.O_EXCL if you don't want to open a queue if it exists. O_NONBLOCK if you don't want to block on send/receive. This flag may not be supported by a particular implementation. To be sure, you can convert Messenger to Blocker and call SetBlocking to set/unset non-blocking mode. perm - permissions for the new queue.
type PriorityMessenger ¶
type PriorityMessenger interface { Messenger Buffered // SendPriority sends the data. The message will be inserted in the mq according to its priority. SendPriority(data []byte, prio int) error // ReceivePriority reads a message and returns its priority. ReceivePriority(data []byte) (int, error) }
PriorityMessenger is a Messenger, which orders messages according to their priority. Semantic is similar to linux native mq: Messages are placed on the queue in decreasing order of priority, with newer messages of the same priority being placed after older messages with the same priority.
type TimedMessenger ¶
type TimedMessenger interface { Messenger // SendTimeout sends the data. It blocks if there are no readers and the queue if full. // It waits for not more, than timeout. SendTimeout(data []byte, timeout time.Duration) error // ReceiveTimeout reads data from the queue. It blocks if the queue is empty. // It waits for not more, than timeout. ReceiveTimeout(data []byte, timeout time.Duration) error }
TimedMessenger is a Messenger, which supports send/receive timeouts. Passing 0 as a timeout makes a call non-blocking. Passing negative value as a timeout makes the timeout infinite.