amqp

package module
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2023 License: MIT Imports: 10 Imported by: 2

README

pkg.amqp

Queuing interface package for com projects

Usage example

package main

import (
	"fmt"
	amqp "github.com/taranovegor/pkg.amqp"
	"log"
	"os"
	"time"
)

type RegularMessage struct {
	Text string
}

type MessageForReply struct {
	Text string
}

type MessageReply struct {
	OriginText string
}

type LogRegularConsumer struct {
	amqp.Consumer
}

func (h LogRegularConsumer) Name() string {
	return "consumer_log_regular"
}

func (h LogRegularConsumer) Handle(body amqp.Body) amqp.Handled {
	log.Printf("Logged message: %+v", body)

	return amqp.HandledSuccessfully()
}

type RegularConsumer struct {
	amqp.Consumer
}

func (h RegularConsumer) Name() string {
	return "consumer_regular"
}

func (h RegularConsumer) Handle(body amqp.Body) amqp.Handled {
	msg := RegularMessage{}
	body.To(&msg)

	log.Printf("Consumed message: %s", msg.Text)

	return amqp.HandledSuccessfully()
}

type WithReplyConsumer struct {
	amqp.Consumer
}

func (h WithReplyConsumer) Name() string {
	return "consumer_with_reply"
}

func (h WithReplyConsumer) Handle(body amqp.Body) amqp.Handled {
	msg := MessageForReply{}
	body.To(&msg)

	log.Printf("Consumed message with reply: %s", msg.Text)

	return amqp.HandledSuccessfully().WithReply(MessageReply{OriginText: msg.Text})
}

var cfg = amqp.NewConfig(
	map[string]amqp.ConsumerConfig{
		"consumer_log_regular": {Queue: "msg.log", Exclusive: false, NoLocal: false, NoWait: false},
		"consumer_regular":     {Queue: "msg.process", Exclusive: false, NoLocal: false, NoWait: false},
		"consumer_with_reply":  {Queue: "msg.with_reply", Exclusive: true, NoLocal: true, NoWait: true},
	},
	map[string]amqp.ExchangeConfig{
		"msg.topic": {Kind: amqp.ExchangeTopic},
	},
	map[string]amqp.QueueConfig{
		"msg.log":        {},
		"msg.process":    {},
		"msg.with_reply": {},
	},
	map[string][]amqp.QueueBindConfig{
		"msg.log":        {{Key: "msg.*", Exchange: "msg.topic"}},
		"msg.process":    {{Key: "msg.regular", Exchange: "msg.topic"}},
		"msg.with_reply": {{Key: "msg.with_reply", Exchange: "msg.topic"}},
	},
	map[string]amqp.ProducerConfig{
		"producer_regular":            {Exchange: "msg.topic", Key: "msg.regular"},
		"producer_awaiting_for_reply": {Exchange: "msg.topic", Key: "msg.with_reply", ReplyTo: "response"},
	},
	map[interface{}]amqp.RouteConfig{
		RegularMessage{}:  {Producer: "producer_regular"},
		MessageForReply{}: {Producer: "producer_awaiting_for_reply"},
	},
)

func main() {
	ctrl, err := amqp.Init("pkg.amqp", os.Getenv("AMQP_URL"), cfg, []amqp.Consumer{
		LogRegularConsumer{},
		RegularConsumer{},
		WithReplyConsumer{},
	})
	if err != nil {
		panic(err)
	}

	ctrl.Consume()

	ctrl.Publish(
		amqp.MessageToPublish(
			RegularMessage{Text: fmt.Sprintf("regular message, created at %s", time.Now().String())},
		),
	)

	ctrl.Publish(
		amqp.MessageToPublishWithReply(
			MessageForReply{Text: fmt.Sprintf("message for reply, created at %s", time.Now().String())},
			func(body amqp.Body) amqp.Handled {
				msg := MessageReply{}
				body.To(&msg)

				log.Printf("Consumed message reply: %s", msg.OriginText)

				return amqp.HandledSuccessfully()
			},
		),
	)

	select {}
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ConfigNotFound = errors.New("config not found")

Functions

This section is empty.

Types

type AwaitForBodyHandler added in v0.5.0

type AwaitForBodyHandler struct {
	Handler BodyHandler
	// contains filtered or unexported fields
}

func WaitForReply added in v0.4.0

func WaitForReply[T interface{}](i T) (*AwaitForBodyHandler, *T)

func (*AwaitForBodyHandler) Done added in v0.5.0

func (h *AwaitForBodyHandler) Done()

func (*AwaitForBodyHandler) Wait added in v0.5.0

func (h *AwaitForBodyHandler) Wait()

type Body

type Body map[string]interface{}

func (Body) To

func (b Body) To(i interface{})

type BodyHandler added in v0.5.0

type BodyHandler func(Body) Handled

type Config

type Config struct {
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig(
	consumers map[string]ConsumerConfig,
	exchanges map[string]ExchangeConfig,
	queues map[string]QueueConfig,
	queueBindings map[string][]QueueBindConfig,
	producers map[string]ProducerConfig,
	routing map[interface{}]RouteConfig,
) Config

func (Config) GetConsumer

func (c Config) GetConsumer(name string) (ConsumerConfig, error)

func (Config) GetExchange

func (c Config) GetExchange(name string) (ExchangeConfig, error)

func (Config) GetProducer

func (c Config) GetProducer(name string) (ProducerConfig, error)

func (Config) GetQueue

func (c Config) GetQueue(name string) (QueueConfig, error)

func (Config) GetQueueBind added in v0.7.0

func (c Config) GetQueueBind(name string) ([]QueueBindConfig, error)

func (Config) GetRoute

func (c Config) GetRoute(i interface{}) (RouteConfig, error)

type Consumer

type Consumer interface {
	Name() string
	Handle(Body) Handled
}

type ConsumerConfig

type ConsumerConfig struct {
	Queue     string
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      map[string]interface{}
}

type Controller

type Controller struct {
	*Producer
	// contains filtered or unexported fields
}

func Init

func Init(
	appName string,
	url string,
	config Config,
	consumers []Consumer,
) (*Controller, error)

func (Controller) Consume

func (c Controller) Consume()

type ExchangeConfig

type ExchangeConfig struct {
	Kind       ExchangeKind
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       map[string]interface{}
}

type ExchangeKind added in v0.7.0

type ExchangeKind string
const (
	ExchangeDirect  ExchangeKind = amqp.ExchangeDirect
	ExchangeFanout  ExchangeKind = amqp.ExchangeFanout
	ExchangeTopic   ExchangeKind = amqp.ExchangeTopic
	ExchangeHeaders ExchangeKind = amqp.ExchangeHeaders
)

func (ExchangeKind) String added in v0.7.0

func (e ExchangeKind) String() string

type Handled

type Handled struct {
	// contains filtered or unexported fields
}

func HandledAndRejected

func HandledAndRejected() Handled

func HandledNotSuccessfully

func HandledNotSuccessfully(requeue bool) Handled

func HandledSuccessfully

func HandledSuccessfully() Handled

func (Handled) WithReply

func (h Handled) WithReply(r interface{}) Handled

type MessageType

type MessageType string
const (
	MessageRegular   MessageType = "regular"
	MessageWithReply MessageType = "with_reply"
)

type NoReply

type NoReply struct {
}

type Producer added in v0.3.0

type Producer struct {
	// contains filtered or unexported fields
}

func (Producer) Publish added in v0.3.0

func (p Producer) Publish(msg PublishMessage) (PublishedMessage, error)

type ProducerConfig

type ProducerConfig struct {
	Exchange  string
	Key       string
	Mandatory bool
	Immediate bool
}

type PublishMessage

type PublishMessage struct {
	// contains filtered or unexported fields
}

func MessageToPublish

func MessageToPublish(msg interface{}) PublishMessage

func MessageToPublishWithReply

func MessageToPublishWithReply(msg interface{}, handler BodyHandler) PublishMessage

type PublishedMessage

type PublishedMessage struct {
	ID            uuid.UUID
	CorrelationID uuid.UUID
	SentAt        time.Time
	Message       interface{}
}

type QueueBindConfig added in v0.7.0

type QueueBindConfig struct {
	Key      string
	Exchange string
	NoWait   bool
	Args     map[string]interface{}
}

type QueueConfig

type QueueConfig struct {
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       map[string]interface{}
}

type RouteConfig

type RouteConfig struct {
	Producer string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL