mq

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2016 License: Apache-2.0 Imports: 17 Imported by: 1

Documentation

Overview

Package mq implements interprocess queues logic. It provides access to system mq mechanisms, such as sysv mq and linux mq. Also, it provides access to multi-platform priority queue, FastMq.

Index

Examples

Constants

View Source
const (
	// DefaultFastMqMaxSize is the default fast mq queue size.
	DefaultFastMqMaxSize = 8
	// DefaultFastMqMessageSize is the fast mq message size.
	DefaultFastMqMessageSize = 8192
)
View Source
const (
	// O_NONBLOCK flag makes mq send/receive operations non-blocking.
	O_NONBLOCK = common.O_NONBLOCK
)

Variables

This section is empty.

Functions

func Destroy added in v0.4.0

func Destroy(name string) error

Destroy permanently removes mq object.

func DestroyFastMq

func DestroyFastMq(name string) error

DestroyFastMq permanently removes a FastMq.

func FastMqAttrs added in v0.3.0

func FastMqAttrs(name string) (int, int, error)

FastMqAttrs returns capacity and max message size of the existing mq.

func IsTemporary added in v0.4.0

func IsTemporary(err error) bool

IsTemporary returns true, if an error is a timeout error.

Types

type Blocker added in v0.4.0

type Blocker interface {
	SetBlocking(bool) error
}

Blocker is an object, which can work in blocking and non-blocking modes.

type Buffered added in v0.4.0

type Buffered interface {
	Cap() int
}

Buffered is an object with internal buffer of the given capacity.

type FastMq

type FastMq struct {
	// contains filtered or unexported fields
}

FastMq is a priority message queue based on shared memory. Currently it is the only implementation for windows.

func CreateFastMq

func CreateFastMq(name string, flag int, perm os.FileMode, maxQueueSize, maxMsgSize int) (*FastMq, error)

CreateFastMq creates new FastMq.

name - mq name. implementation will create a shm object with this name.
flag - flag is a combination of os.O_EXCL, and O_NONBLOCK.
perm - object's permission bits.
maxQueueSize - queue capacity.
maxMsgSize - maximum message size.

func OpenFastMq

func OpenFastMq(name string, flag int) (*FastMq, error)

OpenFastMq opens an existing message queue. It returns an error, if it does not exist.

name - unique mq name.
flag - 0 or O_NONBLOCK.

func (*FastMq) Cap

func (mq *FastMq) Cap() int

Cap returns size of the mq buffer.

func (*FastMq) Close

func (mq *FastMq) Close() error

Close closes a FastMq instance.

func (*FastMq) Destroy

func (mq *FastMq) Destroy() error

Destroy permanently removes a FastMq instance.

func (*FastMq) Empty added in v0.3.0

func (mq *FastMq) Empty() bool

Empty returns true, if there are no messages in the queue.

func (*FastMq) Full added in v0.3.0

func (mq *FastMq) Full() bool

Full returns true, if the capacity liimt has been reached.

func (*FastMq) Receive

func (mq *FastMq) Receive(data []byte) (int, error)

Receive receives a message. It blocks if the queue is empty.

func (*FastMq) ReceivePriority

func (mq *FastMq) ReceivePriority(data []byte) (int, int, error)

ReceivePriority receives a message and returns its priority. It blocks if the queue is empty.

func (*FastMq) ReceivePriorityTimeout

func (mq *FastMq) ReceivePriorityTimeout(data []byte, timeout time.Duration) (int, int, error)

ReceivePriorityTimeout receives a message and returns its priority. It blocks if the queue is empty, waiting for not longer, then the timeout.

func (*FastMq) ReceiveTimeout added in v0.3.0

func (mq *FastMq) ReceiveTimeout(data []byte, timeout time.Duration) (int, error)

ReceiveTimeout receives a message. It blocks if the queue is empty. It blocks if the queue is empty, waiting for not longer, then the timeout.

func (*FastMq) Send

func (mq *FastMq) Send(data []byte) error

Send sends a message. It blocks if the queue is full.

func (*FastMq) SendPriority

func (mq *FastMq) SendPriority(data []byte, prio int) error

SendPriority sends a message with the given priority. It blocks if the queue is full.

func (*FastMq) SendPriorityTimeout

func (mq *FastMq) SendPriorityTimeout(data []byte, prio int, timeout time.Duration) error

SendPriorityTimeout sends a message with the given priority. It blocks if the queue is full, waiting for not longer, then the timeout.

func (*FastMq) SendTimeout added in v0.3.0

func (mq *FastMq) SendTimeout(data []byte, timeout time.Duration) error

SendTimeout sends a message with the default priority 0. It blocks if the queue is full, waiting for not longer, then the timeout.

func (*FastMq) SetBlocking

func (mq *FastMq) SetBlocking(block bool) error

SetBlocking sets whether the send/receive operations on the queue block. This applies to the current instance only.

type Messenger added in v0.4.0

type Messenger interface {
	// Send sends the data. It blocks if there are no readers and the queue is full
	Send(data []byte) error
	// Receive reads data from the queue. It blocks if the queue is empty.
	// Returns message len.
	Receive(data []byte) (int, error)
	io.Closer
}

Messenger is an interface which must be satisfied by any message queue implementation on any platform.

Example
mq, err := New("mq", os.O_CREATE|os.O_EXCL, 0666)
if err != nil {
	panic("new queue")
}
defer mq.Close()
data := []byte{1, 2, 3, 4, 5, 6, 7, 8}
go func() {
	if err := mq.Send(data); err != nil {
		panic("send")
	}
}()
mq2, err := Open("mq", 0)
if err != nil {
	panic("open")
}
defer mq2.Close()
received := make([]byte, len(data))
l, err := mq2.Receive(data)
if err != nil {
	panic("receive")
}
if l != len(data) {
	panic("wrong len")
}
for i, b := range received {
	if b != data[i] {
		panic("wrong data")
	}
}
Output:

func New added in v0.4.0

func New(name string, flag int, perm os.FileMode) (Messenger, error)

New creates a mq with a given name and permissions. It uses the default implementation. If there are several implementations on a platform, you can use explicit create functions.

name - unique queue name.
flag - create flags. You can specify:
	os.O_EXCL if you don't want to open a queue if it exists.
	O_NONBLOCK if you don't want to block on send/receive.
		This flag may not be supported by a particular implementation. To be sure, you can convert Messenger
		to Blocker and call SetBlocking to set/unset non-blocking mode.
perm - permissions for the new queue.

func Open added in v0.4.0

func Open(name string, flags int) (Messenger, error)

Open opens a mq with a given name and flags. It uses the default implementation. If there are several implementations on a platform, you can use explicit create functions.

name  - unique queue name.
flags - 0 or O_NONBLOCK.

type PriorityMessenger added in v0.4.0

type PriorityMessenger interface {
	Messenger
	Buffered
	// SendPriority sends the data. The message will be inserted in the mq according to its priority.
	SendPriority(data []byte, prio int) error
	// ReceivePriority reads a message and returns its len and priority.
	ReceivePriority(data []byte) (int, int, error)
}

PriorityMessenger is a Messenger, which orders messages according to their priority. Semantic is similar to linux native mq: Messages are placed on the queue in decreasing order of priority, with newer messages of the same priority being placed after older messages with the same priority.

Example
Destroy("mq")
mq, err := New("mq", os.O_CREATE|os.O_EXCL, 0666)
if err != nil {
	panic("new queue")
}
defer mq.Close()
// not all implementations support prioritized send/receive.
tmq, ok := mq.(PriorityMessenger)
if !ok {
	panic("not a prio messenger")
}
data := []byte{1, 2, 3, 4, 5, 6, 7, 8}
go func() {
	if err := tmq.SendPriority(data, 0); err != nil {
		panic("send")
	}
	if err := tmq.SendPriority(data, 1); err != nil {
		panic("send")
	}
}()
mq2, err := Open("mq", 0)
if err != nil {
	panic("open")
}
defer mq2.Close()
tmq2, ok := mq2.(PriorityMessenger)
if !ok {
	panic("not a prio messenger")
}
received := make([]byte, len(data))
_, prio, err := tmq2.ReceivePriority(received)
if err != nil || prio != 1 {
	panic("receive")
}
_, prio, err = tmq2.ReceivePriority(received)
if err != nil || prio != 0 {
	panic("receive")
}
Output:

type TimedMessenger added in v0.4.0

type TimedMessenger interface {
	Messenger
	// SendTimeout sends the data. It blocks if there are no readers and the queue if full.
	// It waits for not more, than timeout.
	SendTimeout(data []byte, timeout time.Duration) error
	// ReceiveTimeout reads data from the queue. It blocks if the queue is empty.
	// It waits for not more, than timeout. Returns message len.
	ReceiveTimeout(data []byte, timeout time.Duration) (int, error)
}

TimedMessenger is a Messenger, which supports send/receive timeouts. Passing 0 as a timeout makes a call non-blocking. Passing negative value as a timeout makes the timeout infinite.

Example
Destroy("mq")
mq, err := New("mq", os.O_CREATE|os.O_EXCL, 0666)
if err != nil {
	panic("new queue")
}
defer mq.Close()
data := []byte{1, 2, 3, 4, 5, 6, 7, 8}
go func() {
	// send after [0..500] ms delay.
	time.Sleep(time.Duration((rand.Int() % 6)) * time.Millisecond * 100)
	if err := mq.Send(data); err != nil {
		panic("send")
	}
}()
mq2, err := Open("mq", 0)
if err != nil {
	panic("open")
}
defer mq2.Close()
// not all implementations support timed send/receive.
tmq, ok := mq2.(TimedMessenger)
if !ok {
	panic("not a timed messenger")
}
received := make([]byte, len(data))
// depending on send delay we either get a timeout error, or receive the data.
l, err := tmq.ReceiveTimeout(received, 500*time.Millisecond)
if err != nil {
	if !IsTemporary(err) {
		panic(err)
	} else { // handle timeout.
		return
	}
}
if l != len(data) {
	panic("wrong len")
}
for i, b := range received {
	if b != data[i] {
		panic("wrong data")
	}
}
Output:

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL