amqprpc

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2020 License: MIT Imports: 9 Imported by: 0

README

Golang AMQP RPC Client

Features:

  • Protocol agnostic RPC Client over AMQP.
  • Can simultaneity talk to multiple servers.
  • Call cancellation.
  • Buffer multiple replies in a channel.
  • Separate publisher\consumer connections.
  • Multiple consumer workers.
  • Auto reconnect.
  • Client close method wait for calls inflight to finish.

Example:

package main

import (
	"log"

	"time"

	"github.com/makasim/amqpextra"
	"github.com/makasim/amqprpc"
	"github.com/streadway/amqp"
)

func main() {
	consumerConn := amqpextra.Dial([]string{"amqp://guest:guest@rabbitmq:5672/amqprpc"})
	publisherConn := amqpextra.Dial([]string{"amqp://guest:guest@rabbitmq:5672/amqprpc"})

	client, err := amqprpc.New(publisherConn, consumerConn)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	call := client.Go(amqpextra.Publishing{
		Key: "a_queue",
		Message: amqp.Publishing{
			Body: []byte(`Have you heard the news?`),
		},
	}, make(chan *amqprpc.Call, 1))
	defer call.Close()

	select {
	case <-call.Done():
		rpl, err := call.Delivery()
		if err != nil {
			log.Fatal(err)

			return
		}

		log.Print(string(rpl.Body))

	case <-time.NewTimer(time.Second).C:
        // timeout
	}
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("amqprpc: call closed")
View Source
var ErrConsumerUnready = errors.New("amqprpc: consumer unready")
View Source
var ErrNotDone = errors.New("amqprpc: call is not done")
View Source
var ErrPublisherUnready = errors.New("amqprpc: publisher unready")
View Source
var ErrReplyQueueGoneAway = errors.New("amqprpc: reply queue has gone away")
View Source
var ErrShutdown = errors.New("amqprpc: client is shut down")

Functions

This section is empty.

Types

type Call

type Call struct {
	AutoAck bool
	// contains filtered or unexported fields
}

func (*Call) Close

func (call *Call) Close()

func (*Call) Closed added in v0.3.0

func (call *Call) Closed() <-chan struct{}

func (*Call) Delivery

func (call *Call) Delivery() (amqp.Delivery, error)

func (*Call) Done

func (call *Call) Done() <-chan *Call

func (*Call) Publishing

func (call *Call) Publishing() amqpextra.Publishing

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(
	publisherConn,
	consumerConn *amqpextra.Connection,
	opts ...Option,
) (*Client, error)

func (*Client) Call

func (client *Client) Call(msg amqpextra.Publishing) (amqp.Delivery, error)

func (*Client) Close

func (client *Client) Close() error

func (*Client) Go

func (client *Client) Go(msg amqpextra.Publishing, done chan *Call) *Call

type Consumer

type Consumer struct {
	Tag       string
	AutoAck   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqp.Table
}

type Option

type Option func(client *Client)

func WithConsumer

func WithConsumer(c Consumer) Option

func WithContext

func WithContext(ctx context.Context) Option

func WithPreFetchCount

func WithPreFetchCount(count int) Option

func WithReplyQueue

func WithReplyQueue(rq ReplyQueue) Option

func WithShutdownPeriod

func WithShutdownPeriod(d time.Duration) Option

func WithWorkerCount

func WithWorkerCount(count int) Option

type ReplyQueue

type ReplyQueue struct {
	Name       string
	Declare    bool
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

Directories

Path Synopsis
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL