Documentation ¶
Overview ¶
Package mq implements interprocess queues logic. It provides access to system mq mechanisms, such as sysv mq and linux mq. Also, it provides access to multi-platform priority queue, FastMq.
Index ¶
- Constants
- func Destroy(name string) error
- func DestroyFastMq(name string) error
- func FastMqAttrs(name string) (int, int, error)
- func IsTemporary(err error) bool
- type Blocker
- type Buffered
- type FastMq
- func (mq *FastMq) Cap() int
- func (mq *FastMq) Close() error
- func (mq *FastMq) Destroy() error
- func (mq *FastMq) Empty() bool
- func (mq *FastMq) Full() bool
- func (mq *FastMq) Receive(data []byte) (int, error)
- func (mq *FastMq) ReceivePriority(data []byte) (int, int, error)
- func (mq *FastMq) ReceivePriorityTimeout(data []byte, timeout time.Duration) (int, int, error)
- func (mq *FastMq) ReceiveTimeout(data []byte, timeout time.Duration) (int, error)
- func (mq *FastMq) Send(data []byte) error
- func (mq *FastMq) SendPriority(data []byte, prio int) error
- func (mq *FastMq) SendPriorityTimeout(data []byte, prio int, timeout time.Duration) error
- func (mq *FastMq) SendTimeout(data []byte, timeout time.Duration) error
- func (mq *FastMq) SetBlocking(block bool) error
- type Messenger
- type PriorityMessenger
- type TimedMessenger
Examples ¶
Constants ¶
const ( // DefaultFastMqMaxSize is the default fast mq queue size. DefaultFastMqMaxSize = 8 // DefaultFastMqMessageSize is the fast mq message size. DefaultFastMqMessageSize = 8192 )
const ( // O_NONBLOCK flag makes mq send/receive operations non-blocking. O_NONBLOCK = common.O_NONBLOCK )
Variables ¶
This section is empty.
Functions ¶
func DestroyFastMq ¶
DestroyFastMq permanently removes a FastMq.
func FastMqAttrs ¶ added in v0.3.0
FastMqAttrs returns capacity and max message size of the existing mq.
func IsTemporary ¶ added in v0.4.0
IsTemporary returns true, if an error is a timeout error.
Types ¶
type Blocker ¶ added in v0.4.0
Blocker is an object, which can work in blocking and non-blocking modes.
type Buffered ¶ added in v0.4.0
type Buffered interface {
Cap() int
}
Buffered is an object with internal buffer of the given capacity.
type FastMq ¶
type FastMq struct {
// contains filtered or unexported fields
}
FastMq is a priority message queue based on shared memory. Currently it is the only implementation for windows.
func CreateFastMq ¶
func CreateFastMq(name string, flag int, perm os.FileMode, maxQueueSize, maxMsgSize int) (*FastMq, error)
CreateFastMq creates new FastMq.
name - mq name. implementation will create a shm object with this name. flag - flag is a combination of os.O_EXCL, and O_NONBLOCK. perm - object's permission bits. maxQueueSize - queue capacity. maxMsgSize - maximum message size.
func OpenFastMq ¶
OpenFastMq opens an existing message queue. It returns an error, if it does not exist.
name - unique mq name. flag - 0 or O_NONBLOCK.
func (*FastMq) ReceivePriority ¶
ReceivePriority receives a message and returns its priority. It blocks if the queue is empty.
func (*FastMq) ReceivePriorityTimeout ¶
ReceivePriorityTimeout receives a message and returns its priority. It blocks if the queue is empty, waiting for not longer, then the timeout.
func (*FastMq) ReceiveTimeout ¶ added in v0.3.0
ReceiveTimeout receives a message. It blocks if the queue is empty. It blocks if the queue is empty, waiting for not longer, then the timeout.
func (*FastMq) SendPriority ¶
SendPriority sends a message with the given priority. It blocks if the queue is full.
func (*FastMq) SendPriorityTimeout ¶
SendPriorityTimeout sends a message with the given priority. It blocks if the queue is full, waiting for not longer, then the timeout.
func (*FastMq) SendTimeout ¶ added in v0.3.0
SendTimeout sends a message with the default priority 0. It blocks if the queue is full, waiting for not longer, then the timeout.
func (*FastMq) SetBlocking ¶
SetBlocking sets whether the send/receive operations on the queue block. This applies to the current instance only.
type Messenger ¶ added in v0.4.0
type Messenger interface { // Send sends the data. It blocks if there are no readers and the queue is full Send(data []byte) error // Receive reads data from the queue. It blocks if the queue is empty. // Returns message len. Receive(data []byte) (int, error) io.Closer }
Messenger is an interface which must be satisfied by any message queue implementation on any platform.
Example ¶
mq, err := New("mq", os.O_CREATE|os.O_EXCL, 0666) if err != nil { panic("new queue") } defer mq.Close() data := []byte{1, 2, 3, 4, 5, 6, 7, 8} go func() { if err := mq.Send(data); err != nil { panic("send") } }() mq2, err := Open("mq", 0) if err != nil { panic("open") } defer mq2.Close() received := make([]byte, len(data)) l, err := mq2.Receive(data) if err != nil { panic("receive") } if l != len(data) { panic("wrong len") } for i, b := range received { if b != data[i] { panic("wrong data") } }
Output:
func New ¶ added in v0.4.0
New creates a mq with a given name and permissions. It uses the default implementation. If there are several implementations on a platform, you can use explicit create functions.
name - unique queue name. flag - create flags. You can specify: os.O_EXCL if you don't want to open a queue if it exists. O_NONBLOCK if you don't want to block on send/receive. This flag may not be supported by a particular implementation. To be sure, you can convert Messenger to Blocker and call SetBlocking to set/unset non-blocking mode. perm - permissions for the new queue.
type PriorityMessenger ¶ added in v0.4.0
type PriorityMessenger interface { Messenger Buffered // SendPriority sends the data. The message will be inserted in the mq according to its priority. SendPriority(data []byte, prio int) error // ReceivePriority reads a message and returns its len and priority. ReceivePriority(data []byte) (int, int, error) }
PriorityMessenger is a Messenger, which orders messages according to their priority. Semantic is similar to linux native mq: Messages are placed on the queue in decreasing order of priority, with newer messages of the same priority being placed after older messages with the same priority.
Example ¶
Destroy("mq") mq, err := New("mq", os.O_CREATE|os.O_EXCL, 0666) if err != nil { panic("new queue") } defer mq.Close() // not all implementations support prioritized send/receive. tmq, ok := mq.(PriorityMessenger) if !ok { panic("not a prio messenger") } data := []byte{1, 2, 3, 4, 5, 6, 7, 8} go func() { if err := tmq.SendPriority(data, 0); err != nil { panic("send") } if err := tmq.SendPriority(data, 1); err != nil { panic("send") } }() mq2, err := Open("mq", 0) if err != nil { panic("open") } defer mq2.Close() tmq2, ok := mq2.(PriorityMessenger) if !ok { panic("not a prio messenger") } received := make([]byte, len(data)) _, prio, err := tmq2.ReceivePriority(received) if err != nil || prio != 1 { panic("receive") } _, prio, err = tmq2.ReceivePriority(received) if err != nil || prio != 0 { panic("receive") }
Output:
type TimedMessenger ¶ added in v0.4.0
type TimedMessenger interface { Messenger // SendTimeout sends the data. It blocks if there are no readers and the queue if full. // It waits for not more, than timeout. SendTimeout(data []byte, timeout time.Duration) error // ReceiveTimeout reads data from the queue. It blocks if the queue is empty. // It waits for not more, than timeout. Returns message len. ReceiveTimeout(data []byte, timeout time.Duration) (int, error) }
TimedMessenger is a Messenger, which supports send/receive timeouts. Passing 0 as a timeout makes a call non-blocking. Passing negative value as a timeout makes the timeout infinite.
Example ¶
Destroy("mq") mq, err := New("mq", os.O_CREATE|os.O_EXCL, 0666) if err != nil { panic("new queue") } defer mq.Close() data := []byte{1, 2, 3, 4, 5, 6, 7, 8} go func() { // send after [0..500] ms delay. time.Sleep(time.Duration((rand.Int() % 6)) * time.Millisecond * 100) if err := mq.Send(data); err != nil { panic("send") } }() mq2, err := Open("mq", 0) if err != nil { panic("open") } defer mq2.Close() // not all implementations support timed send/receive. tmq, ok := mq2.(TimedMessenger) if !ok { panic("not a timed messenger") } received := make([]byte, len(data)) // depending on send delay we either get a timeout error, or receive the data. l, err := tmq.ReceiveTimeout(received, 500*time.Millisecond) if err != nil { if !IsTemporary(err) { panic(err) } else { // handle timeout. return } } if l != len(data) { panic("wrong len") } for i, b := range received { if b != data[i] { panic("wrong data") } }
Output: