pkg

package
v0.0.0-...-717471a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2022 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ApiCaller

type ApiCaller interface {
	Configure(logger Logger, config *EventHandlerConfig) error
	Call(ctx context.Context, httpMethod string, url string, body []byte, headers HttpHeaders) (responseBody []byte, httpStatusCode int, err error)
	Close() error
}

type Cli

type Cli struct {
	Options *Options
}

func NewCli

func NewCli() *Cli

func (*Cli) ReadEventorConfigs

func (c *Cli) ReadEventorConfigs() *[]EventorConfig

type EventHandler

type EventHandler struct {
	// contains filtered or unexported fields
}

func NewEventHandler

func NewEventHandler(logger Logger, config *EventHandlerConfig, apiCaller ApiCaller) (*EventHandler, error)

func (*EventHandler) Close

func (h *EventHandler) Close()

func (*EventHandler) Handle

func (h *EventHandler) Handle(ctx context.Context, msg *Message) ([]byte, int, error)

type EventHandlerConfig

type EventHandlerConfig struct {
	Type    string // 'http' is the only supported type for now
	Method  string
	Url     string
	Headers HttpHeaders
}

EventHandlerConfig defines an HTTP endpoint to which the events' payload is sent to

type EventListener

type EventListener struct {
	// contains filtered or unexported fields
}

func NewEventListener

func NewEventListener(logger Logger, config *EventListenerConfig, consumer MessageConsumer) (*EventListener, error)

func (*EventListener) Close

func (l *EventListener) Close()

func (*EventListener) Listen

func (l *EventListener) Listen(ctx context.Context, handleMessage func(ctx context.Context, msg *Message) error)

type EventListenerConfig

type EventListenerConfig struct {
	Type           string // 'kafka' is the only supported type for now
	Topic          string
	ConsumerConfig KafkaConsumerConfig `yaml:"consumerConfig"`
}

EventListenerConfig defines a Kafka producer

type EventResultProcessor

type EventResultProcessor struct {
	// contains filtered or unexported fields
}

func NewEventResultProcessor

func NewEventResultProcessor(logger Logger, config *EventResultProcessorConfig, producer MessageProducer) (*EventResultProcessor, error)

func (*EventResultProcessor) Close

func (r *EventResultProcessor) Close()

func (*EventResultProcessor) Produce

func (r *EventResultProcessor) Produce(ctx context.Context, message *Message) error

type EventResultProcessorConfig

type EventResultProcessorConfig struct {
	Type           string // 'kafka' is the only supported type for now
	Topic          string
	When           []string
	ProducerConfig KafkaProducerConfig `yaml:"producerConfig"`
}

EventResultProcessorConfig defines a Kafka producer which posts the response from the EventHandlerConfig to a Kafka topic

type Eventor

type Eventor struct {
	// contains filtered or unexported fields
}

func NewEventor

func NewEventor(config EventorConfig, consumer MessageConsumer, apiCaller ApiCaller, producer MessageProducer) *Eventor

func (*Eventor) Run

func (e *Eventor) Run(ctx context.Context, logger Logger) error

type EventorConfig

type EventorConfig struct {
	Name                 string
	LogLevel             int
	EventListener        EventListenerConfig        `yaml:"eventListener"`
	EventHandler         EventHandlerConfig         `yaml:"eventHandler"`
	EventResultProcessor EventResultProcessorConfig `yaml:"eventResultProcessor"`
}

func NewEventorConfigs

func NewEventorConfigs(unmarshaller func(in []byte, out interface{}) error, data []byte) []EventorConfig

type HttpApiCaller

type HttpApiCaller struct {
	// contains filtered or unexported fields
}

func (*HttpApiCaller) Call

func (c *HttpApiCaller) Call(ctx context.Context, httpMethod string, url string, body []byte, headers HttpHeaders) (responseBody []byte, httpStatusCode int, err error)

func (*HttpApiCaller) Close

func (c *HttpApiCaller) Close() error

func (*HttpApiCaller) Configure

func (c *HttpApiCaller) Configure(logger Logger, config *EventHandlerConfig) error

type HttpHeaders

type HttpHeaders = map[string]string

type KafkaConsumerConfig

type KafkaConsumerConfig = map[string]string

type KafkaMessageConsumer

type KafkaMessageConsumer struct {
	Consumer *kafka.Consumer
	// contains filtered or unexported fields
}

func (*KafkaMessageConsumer) Close

func (k *KafkaMessageConsumer) Close() error

func (*KafkaMessageConsumer) CommitLastMessage

func (k *KafkaMessageConsumer) CommitLastMessage(ctx context.Context) error

func (*KafkaMessageConsumer) Configure

func (k *KafkaMessageConsumer) Configure(logger Logger, config *KafkaConsumerConfig, topic string) error

func (*KafkaMessageConsumer) Consume

func (k *KafkaMessageConsumer) Consume(ctx context.Context) (*Message, error)

func (*KafkaMessageConsumer) Subscribe

func (k *KafkaMessageConsumer) Subscribe() error

type KafkaMessageProducer

type KafkaMessageProducer struct {
	Producer *kafka.Producer
	// contains filtered or unexported fields
}

func (*KafkaMessageProducer) Close

func (k *KafkaMessageProducer) Close() error

func (*KafkaMessageProducer) Configure

func (k *KafkaMessageProducer) Configure(logger Logger, config *KafkaProducerConfig, topic string) error

func (*KafkaMessageProducer) Produce

func (k *KafkaMessageProducer) Produce(ctx context.Context, msg *Message) error

type KafkaProducerConfig

type KafkaProducerConfig = map[string]string

type Level

type Level int
const (
	DEBUG Level = iota
	INFO
	WARN
	ERROR
)

type Logger

type Logger interface {
	Debug(string)
	Info(string)
	Warn(string)
	Error(string)
	Debugf(string, ...interface{})
	Infof(string, ...interface{})
	Warnf(string, ...interface{})
	Errorf(string, ...interface{})
}

func DefaultLogger

func DefaultLogger(level Level) Logger

type Message

type Message struct {
	Key   []byte
	Value []byte
}

type MessageConsumer

type MessageConsumer interface {
	Configure(logger Logger, config *KafkaConsumerConfig, topic string) error
	Subscribe() error
	Consume(ctx context.Context) (*Message, error)
	CommitLastMessage(ctx context.Context) error
	Close() error
}

type MessageProducer

type MessageProducer interface {
	Configure(logger Logger, config *KafkaProducerConfig, topic string) error
	Produce(ctx context.Context, msg *Message) error
	Close() error
}

type Options

type Options struct {
	// contains filtered or unexported fields
}

type Unmarshaller

type Unmarshaller interface {
	Unmarshal(in []byte, out interface{}) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL