amqp

package module
v0.0.0-...-9c39747 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2023 License: Apache-2.0 Imports: 5 Imported by: 0

README

xk6-amqp

A k6 extension for publishing and consuming messages from queues and exchanges. This project utilizes AMQP 0.9.1, the most common AMQP protocol in use today.

⚠ This project is not compatible with AMQP 1.0. A list of AMQP 1.0 brokers and other AMQP 1.0 resources may be found at github.com/xinchen10/awesome-amqp.

Build

To build a k6 binary with this extension, first ensure you have the prerequisites:

Then:

  1. Download xk6:
$ go install go.k6.io/xk6/cmd/xk6@latest
  1. Build the k6 binary:
$ xk6 build --with github.com/grafana/xk6-amqp@latest

Development

To make development a little smoother, use the Makefile in the root folder. The default target will format your code, run tests, and create a k6 binary with your local code rather than from GitHub.

git clone [email protected]:grafana/xk6-amqp.git
cd xk6-amqp
make

Example

import Amqp from 'k6/x/amqp';
import Queue from 'k6/x/amqp/queue';

export default function () {
  console.log("K6 amqp extension enabled, version: " + Amqp.version)
  const url = "amqp://guest:guest@localhost:5672/"
  Amqp.start({
    connection_url: url
  })
  console.log("Connection opened: " + url)
  
  const queueName = 'K6 general'
  
  Queue.declare({
    name: queueName,
    // durable: false,
    // delete_when_unused: false,
    // exclusive: false,
    // no_wait: false,
    // args: null
  })

  console.log(queueName + " queue is ready")

  Amqp.publish({
    queue_name: queueName,
    body: "Ping from k6",
    content_type: "text/plain"
    // exchange: '',
    // mandatory: false,
    // immediate: false,
    // headers: {
    //   'header-1': '',
    // },
  })

  const listener = function(data) { console.log('received data: ' + data) }
  Amqp.listen({
    queue_name: queueName,
    listener: listener,
    // consumer: '',
    // auto_ack: true,
    // exclusive: false,
		// no_local: false,
		// no_wait: false,
    // args: null
  })
}

Result output:

$ ./k6 run script.js

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: ../xk6-amqp/examples/test.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)

INFO[0000] K6 amqp extension enabled, version: v0.0.1    source=console
INFO[0000] Connection opened: amqp://guest:guest@localhost:5672/  source=console
INFO[0000] K6 general queue is ready                     source=console
INFO[0000] received data: Ping from k6                   source=console

running (00m00.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  00m00.0s/10m0s  1/1 iters, 1 per VU

     data_received........: 0 B 0 B/s
     data_sent............: 0 B 0 B/s
     iteration_duration...: avg=31.37ms min=31.37ms med=31.37ms max=31.37ms p(90)=31.37ms p(95)=31.37ms
     iterations...........: 1   30.855627/s

Inspect examples folder for more details.

Documentation

Overview

Package amqp contains AMQP API for a remote server.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AMQP

type AMQP struct {
	Version    string
	Connection *amqpDriver.Connection
	Queue      *Queue
	Exchange   *Exchange
	Channel    *amqpDriver.Channel
}

AMQP type holds connection to a remote AMQP server.

func (*AMQP) CloseChannel

func (amqp *AMQP) CloseChannel()

func (*AMQP) Listen

func (amqp *AMQP) Listen(options ListenOptions) error

Listen binds to an AMQP queue in order to receive message(s) as they are received.

func (*AMQP) Publish

func (amqp *AMQP) Publish(options PublishOptions) error

Publish delivers the payload using options provided.

func (*AMQP) Start

func (amqp *AMQP) Start(options Options) error

Start establishes a session with an AMQP server given the provided options.

type ConsumeOptions

type ConsumeOptions struct {
	Consumer  string
	AutoAck   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqpDriver.Table
}

ConsumeOptions defines options for use when consuming a message.

type DeclareOptions

type DeclareOptions struct {
	Name             string
	Durable          bool
	DeleteWhenUnused bool
	Exclusive        bool
	NoWait           bool
	Args             amqpDriver.Table
}

DeclareOptions provides queue options when declaring (creating) a queue.

type Exchange

type Exchange struct {
	Version    string
	Connection *amqpDriver.Connection
}

Exchange defines a connection to publish/subscribe destinations.

func (*Exchange) Bind

func (exchange *Exchange) Bind(options ExchangeBindOptions) error

Bind subscribes one exchange to another.

func (*Exchange) Declare

func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error

Declare creates a new exchange given the provided options.

func (*Exchange) Delete

func (exchange *Exchange) Delete(name string) error

Delete removes an exchange from the remote server given the exchange name.

func (*Exchange) Unbind

func (exchange *Exchange) Unbind(options ExchangeUnbindOptions) error

Unbind removes a subscription from one exchange to another.

type ExchangeBindOptions

type ExchangeBindOptions struct {
	DestinationExchangeName string
	SourceExchangeName      string
	RoutingKey              string
	NoWait                  bool
	Args                    amqpDriver.Table
}

ExchangeBindOptions provides options when binding (subscribing) one exchange to another.

type ExchangeDeclareOptions

type ExchangeDeclareOptions struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqpDriver.Table
}

ExchangeDeclareOptions provides options when declaring (creating) an exchange.

type ExchangeOptions

type ExchangeOptions struct {
	ConnectionURL string
}

ExchangeOptions defines configuration settings for accessing an exchange.

type ExchangeUnbindOptions

type ExchangeUnbindOptions struct {
	DestinationExchangeName string
	SourceExchangeName      string
	RoutingKey              string
	NoWait                  bool
	Args                    amqpDriver.Table
}

ExchangeUnbindOptions provides options when unbinding (unsubscribing) one exchange from another.

type ListenOptions

type ListenOptions struct {
	Listener  ListenerType
	QueueName string
	Consumer  string
	AutoAck   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqpDriver.Table
}

ListenOptions defines options for subscribing to message(s) within a queue.

type ListenerType

type ListenerType func(interface{}) error

ListenerType is the message handler implemented within JavaScript.

type Options

type Options struct {
	ConnectionURL string
}

Options defines configuration options for an AMQP session.

type PublishOptions

type PublishOptions struct {
	QueueName   string
	Body        string
	Headers     amqpDriver.Table
	Exchange    string
	ContentType string
	Mandatory   bool
	Immediate   bool
	Persistent  bool
}

PublishOptions defines a message payload with delivery options.

type Queue

type Queue struct {
	Version    string
	Connection *amqpDriver.Connection
}

Queue defines a connection to a point-to-point destination.

func (*Queue) Bind

func (queue *Queue) Bind(options QueueBindOptions) error

Bind subscribes a queue to an exchange in order to receive message(s).

func (*Queue) Declare

func (queue *Queue) Declare(options DeclareOptions) (amqpDriver.Queue, error)

Declare creates a new queue given the provided options.

func (*Queue) Delete

func (queue *Queue) Delete(name string) error

Delete removes a queue from the remote server given the queue name.

func (*Queue) Inspect

func (queue *Queue) Inspect(name string) (amqpDriver.Queue, error)

Inspect provides queue metadata given queue name.

func (*Queue) Purge

func (queue *Queue) Purge(name string, noWait bool) (int, error)

Purge removes all non-consumed message(s) from the specified queue.

func (*Queue) Unbind

func (queue *Queue) Unbind(options QueueUnbindOptions) error

Unbind removes a queue subscription from an exchange to discontinue receiving message(s).

type QueueBindOptions

type QueueBindOptions struct {
	QueueName    string
	ExchangeName string
	RoutingKey   string
	NoWait       bool
	Args         amqpDriver.Table
}

QueueBindOptions provides options when binding a queue to an exchange in order to receive message(s).

type QueueOptions

type QueueOptions struct {
	ConnectionURL string
}

QueueOptions defines configuration settings for accessing a queue.

type QueueUnbindOptions

type QueueUnbindOptions struct {
	QueueName    string
	ExchangeName string
	RoutingKey   string
	Args         amqpDriver.Table
}

QueueUnbindOptions provides options when unbinding a queue from an exchange to stop receiving message(s).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL