events

package
v0.0.0-...-b1aa93f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReceiverWalletNewInvitationTopic = "events.receiver-wallets.new_invitation"
	PaymentCompletedTopic            = "events.payment.payment_completed"
	PaymentReadyToPayTopic           = "events.payment.ready_to_pay"
)

Topic Names

Note: when adding a new topic here, please, add the new topic to `kafka-init` service command on dev/docker-compose-sdp-anchor.yml.

`kafka-topics.sh --create --if-not-exists --topic events.new-topic ...`
View Source
const (
	RetryReceiverWalletSMSInvitationType           = "retry-receiver-wallet-sms-invitation"
	BatchReceiverWalletSMSInvitationType           = "batch-receiver-wallet-sms-invitation"
	PaymentCompletedSuccessType                    = "payment-completed-success"
	PaymentCompletedErrorType                      = "payment-completed-error"
	PaymentReadyToPayDisbursementStarted           = "payment-ready-to-pay-disbursement-started"
	PaymentReadyToPayReceiverVerificationCompleted = "payment-ready-to-pay-receiver-verification-completed"
	PaymentReadyToPayRetryFailedPayment            = "payment-ready-to-pay-retry-failed-payment"
)

Type Names

View Source
const DefaultMaxBackoffExponent = 8

Variables

Functions

func ProduceEvents

func ProduceEvents(ctx context.Context, producer Producer, messages ...*Message) error

func ShouldHandleMessage

func ShouldHandleMessage(ctx context.Context, handler EventHandler, msg *Message) bool

ShouldHandleMessage returns true if the message should be handled by the handler passed by parameter. A message should be handled by a handler if the handler can handle the message and the handler has not been executed before.

Types

type Consumer

type Consumer interface {
	ReadMessage(ctx context.Context) (*Message, error)
	Topic() string
	Handlers() []EventHandler
	Close() error
	BrokerType() EventBrokerType
}

Consumer is an interface that defines the methods that a consumer should implement.

type ConsumerBackoffManager

type ConsumerBackoffManager struct {
	// contains filtered or unexported fields
}

func NewBackoffManager

func NewBackoffManager(backoffChan chan<- struct{}, maxBackoff int) *ConsumerBackoffManager

func (*ConsumerBackoffManager) GetBackoffDuration

func (bm *ConsumerBackoffManager) GetBackoffDuration() time.Duration

func (*ConsumerBackoffManager) GetMessage

func (bm *ConsumerBackoffManager) GetMessage() *Message

func (*ConsumerBackoffManager) IsMaxBackoffReached

func (bm *ConsumerBackoffManager) IsMaxBackoffReached() bool

func (*ConsumerBackoffManager) ResetBackoff

func (bm *ConsumerBackoffManager) ResetBackoff()

func (*ConsumerBackoffManager) TriggerBackoff

func (bm *ConsumerBackoffManager) TriggerBackoff()

func (*ConsumerBackoffManager) TriggerBackoffWithMessage

func (bm *ConsumerBackoffManager) TriggerBackoffWithMessage(msg *Message)

type EventBrokerType

type EventBrokerType string
const (
	KafkaEventBrokerType EventBrokerType = "KAFKA"
	// NoneEventBrokerType means that no event broker was chosen.
	NoneEventBrokerType EventBrokerType = "NONE"
)

func ParseEventBrokerType

func ParseEventBrokerType(ebType string) (EventBrokerType, error)

type EventConsumer

type EventConsumer struct {
	// contains filtered or unexported fields
}

func NewEventConsumer

func NewEventConsumer(consumer Consumer, producer Producer, crashTracker crashtracker.CrashTrackerClient) *EventConsumer

func (*EventConsumer) Consume

func (ec *EventConsumer) Consume(ctx context.Context)

type EventHandler

type EventHandler interface {
	Name() string
	CanHandleMessage(ctx context.Context, message *Message) bool
	Handle(ctx context.Context, message *Message) error
}

type EventPatchAnchorPlatformTransactionCompletionData

type EventPatchAnchorPlatformTransactionCompletionData struct {
	PaymentID string `json:"payment_id"`
}

type EventReceiverWalletSMSInvitationData

type EventReceiverWalletSMSInvitationData struct {
	ReceiverWalletID string `json:"id"`
}

type HandlerError

type HandlerError struct {
	// FailedAt timestamp for the time of failure.
	FailedAt time.Time `json:"failed_at"`
	// ErrorMessage detailed error message. Used for displaying.
	ErrorMessage string `json:"error_message"`
	// HandlerName name of the handler where the error occurred.
	HandlerName string `json:"handler_name"`
	// Err full handler error.
	Err error `json:"-"`
}

func NewHandlerError

func NewHandlerError(hError error, handlerName string) HandlerError

type HandlerSuccess

type HandlerSuccess struct {
	// ExecutedAt timestamp for the time of successful handling
	ExecutedAt time.Time `json:"executed_at"`
	// HandlerName name of the handler that succeeded
	HandlerName string `json:"handler_name"`
}

HandlerSuccess represents a successful handling of a message

type KafkaConfig

type KafkaConfig struct {
	Brokers              []string
	SecurityProtocol     KafkaSecurityProtocol
	SASLUsername         string
	SASLPassword         string
	SSLAccessKey         string
	SSLAccessCertificate string
}

func (*KafkaConfig) Validate

func (kc *KafkaConfig) Validate() error

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

func NewKafkaConsumer

func NewKafkaConsumer(config KafkaConfig, topic string, consumerGroupID string, handlers ...EventHandler) (*KafkaConsumer, error)

func (*KafkaConsumer) BrokerType

func (k *KafkaConsumer) BrokerType() EventBrokerType

BrokerType returns the type of the Kafka broker

func (*KafkaConsumer) Close

func (k *KafkaConsumer) Close() error

Close closes the Kafka consumer

func (*KafkaConsumer) Handlers

func (k *KafkaConsumer) Handlers() []EventHandler

Handlers returns the event handlers of the Kafka consumer

func (*KafkaConsumer) ReadMessage

func (k *KafkaConsumer) ReadMessage(ctx context.Context) (*Message, error)

ReadMessage reads a message from the Kafka topic of the consumer and commits the offset

func (*KafkaConsumer) Topic

func (k *KafkaConsumer) Topic() string

Topic returns the topic of the Kafka consumer

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

func NewKafkaProducer

func NewKafkaProducer(config KafkaConfig) (*KafkaProducer, error)

func (*KafkaProducer) BrokerType

func (k *KafkaProducer) BrokerType() EventBrokerType

BrokerType returns the type of the Kafka broker

func (*KafkaProducer) Close

func (k *KafkaProducer) Close(ctx context.Context)

func (*KafkaProducer) Ping

func (k *KafkaProducer) Ping(ctx context.Context) error

Ping pings the Kafka Broker

func (*KafkaProducer) WriteMessages

func (k *KafkaProducer) WriteMessages(ctx context.Context, messages ...Message) error

type KafkaSecurityProtocol

type KafkaSecurityProtocol string
const (
	KafkaProtocolPlaintext     KafkaSecurityProtocol = "PLAINTEXT"
	KafkaProtocolSASLPlaintext KafkaSecurityProtocol = "SASL_PLAINTEXT"
	KafkaProtocolSASLSSL       KafkaSecurityProtocol = "SASL_SSL"
	KafkaProtocolSSL           KafkaSecurityProtocol = "SSL"
)

func ParseKafkaSecurityProtocol

func ParseKafkaSecurityProtocol(protocol string) (KafkaSecurityProtocol, error)

type Message

type Message struct {
	Topic                string           `json:"topic"`
	Key                  string           `json:"key"`
	TenantID             string           `json:"tenant_id"`
	Type                 string           `json:"type"`
	Data                 any              `json:"data"`
	Errors               []HandlerError   `json:"errors,omitempty"`
	SuccessfulExecutions []HandlerSuccess `json:"successful_executions,omitempty"`
}

func NewMessage

func NewMessage(ctx context.Context, topic, key, messageType string, data any) (*Message, error)

NewMessage returns a new message with values passed by parameters. It also parses the `TenantID` from the context and inject it into the message. Returns error if the tenant is not found in the context.

func (*Message) RecordError

func (m *Message) RecordError(handlerName string, handleErr error)

func (*Message) RecordSuccess

func (m *Message) RecordSuccess(handlerName string)

func (Message) String

func (m Message) String() string

func (Message) Validate

func (m Message) Validate() error

type MockConsumer

type MockConsumer struct {
	mock.Mock
}

MockConsumer is a mock implementation of Consumer

func (*MockConsumer) BrokerType

func (c *MockConsumer) BrokerType() EventBrokerType

func (*MockConsumer) Close

func (c *MockConsumer) Close() error

func (*MockConsumer) Handlers

func (c *MockConsumer) Handlers() []EventHandler

func (*MockConsumer) ReadMessage

func (c *MockConsumer) ReadMessage(ctx context.Context) (*Message, error)

func (*MockConsumer) RegisterEventHandler

func (c *MockConsumer) RegisterEventHandler(ctx context.Context, eventHandlers ...EventHandler) error

func (*MockConsumer) Topic

func (c *MockConsumer) Topic() string

type MockEventHandler

type MockEventHandler struct {
	mock.Mock
}

MockEventHandler is a mock implementation of EventHandler

func NewMockEventHandler

func NewMockEventHandler(t testInterface) *MockEventHandler

func (*MockEventHandler) CanHandleMessage

func (h *MockEventHandler) CanHandleMessage(ctx context.Context, msg *Message) bool

func (*MockEventHandler) Handle

func (h *MockEventHandler) Handle(ctx context.Context, msg *Message) error

func (*MockEventHandler) Name

func (h *MockEventHandler) Name() string

type MockProducer

type MockProducer struct {
	mock.Mock
}

MockProducer is a mock implementation of Producer

func NewMockProducer

func NewMockProducer(t testInterface) *MockProducer

NewMockProducer creates a new instance of MockProducer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockProducer) BrokerType

func (c *MockProducer) BrokerType() EventBrokerType

func (*MockProducer) Close

func (c *MockProducer) Close(ctx context.Context)

func (*MockProducer) Ping

func (c *MockProducer) Ping(ctx context.Context) error

func (*MockProducer) WriteMessages

func (c *MockProducer) WriteMessages(ctx context.Context, messages ...Message) error

type NoopProducer

type NoopProducer struct{}

NoopProducer is a producer used to log messages instead of sending them to a real producer.

func (NoopProducer) BrokerType

func (p NoopProducer) BrokerType() EventBrokerType

func (NoopProducer) Close

func (p NoopProducer) Close(ctx context.Context)

func (NoopProducer) Ping

func (p NoopProducer) Ping(ctx context.Context) error

func (NoopProducer) WriteMessages

func (p NoopProducer) WriteMessages(ctx context.Context, messages ...Message) error

type Producer

type Producer interface {
	WriteMessages(ctx context.Context, messages ...Message) error
	Ping(ctx context.Context) error
	Close(ctx context.Context)
	BrokerType() EventBrokerType
}

Producer is an interface that defines the methods that a producer should implement.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL