binding

package
v2.15.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2024 License: Apache-2.0 Imports: 10 Imported by: 132

Documentation

Overview

Package binding defines interfaces for protocol bindings.

NOTE: Most applications that emit or consume events should use the ../client package, which provides a simpler API to the underlying binding.

The interfaces in this package provide extra encoding and protocol information to allow efficient forwarding and end-to-end reliable delivery between a Receiver and a Sender belonging to different bindings. This is useful for intermediary applications that route or forward events, but not necessary for most "endpoint" applications that emit or consume events.

Protocol Bindings

A protocol binding usually implements a Message, a Sender and Receiver, a StructuredWriter and a BinaryWriter (depending on the supported encodings of the protocol) and an Write[ProtocolMessage] method.

Read and write events

The core of this package is the binding.Message interface. Through binding.MessageReader It defines how to read a protocol specific message for an encoded event in structured mode or binary mode. The entity who receives a protocol specific data structure representing a message (e.g. an HttpRequest) encapsulates it in a binding.Message implementation using a NewMessage method (e.g. http.NewMessage). Then the entity that wants to send the binding.Message back on the wire, translates it back to the protocol specific data structure (e.g. a Kafka ConsumerMessage), using the writers BinaryWriter and StructuredWriter specific to that protocol. Binding implementations exposes their writers through a specific Write[ProtocolMessage] function (e.g. kafka.EncodeProducerMessage), in order to simplify the encoding process.

The encoding process can be customized in order to mutate the final result with binding.TransformerFactory. A bunch of these are provided directly by the binding/transformer module.

Usually binding.Message implementations can be encoded only one time, because the encoding process drain the message itself. In order to consume a message several times, the binding/buffering package provides several APIs to buffer the Message.

A message can be converted to an event.Event using binding.ToEvent() method. An event.Event can be used as Message casting it to binding.EventMessage.

In order to simplify the encoding process for each protocol, this package provide several utility methods like binding.Write and binding.DirectWrite. The binding.Write method tries to preserve the structured/binary encoding, in order to be as much efficient as possible.

Messages can be eventually wrapped to change their behaviours and binding their lifecycle, like the binding.FinishMessage. Every Message wrapper implements the MessageWrapper interface

Sender and Receiver

A Receiver receives protocol specific messages and wraps them to into binding.Message implementations.

A Sender converts arbitrary Message implementations to a protocol-specific form using the protocol specific Write method and sends them.

Message and ExactlyOnceMessage provide methods to allow acknowledgments to propagate when a reliable messages is forwarded from a Receiver to a Sender. QoS 0 (unreliable), 1 (at-least-once) and 2 (exactly-once) are supported.

Transport

A binding implementation providing Sender and Receiver implementations can be used as a Transport through the BindingTransport adapter.

Index

Constants

This section is empty.

Variables

View Source
var ErrCannotConvertToEvent = errors.New("cannot convert message to event")

ErrCannotConvertToEvent is a generic error when a conversion of a Message to an Event fails

View Source
var ErrCannotConvertToEvents = errors.New("cannot convert message to batched events")

ErrCannotConvertToEvents is a generic error when a conversion of a Message to a Batched Event fails

View Source
var ErrNotBinary = errors.New("message is not in binary mode")

ErrNotBinary returned by Message.Binary for non-binary messages.

View Source
var ErrNotStructured = errors.New("message is not in structured mode")

ErrNotStructured returned by Message.Structured for non-structured messages.

View Source
var ErrUnknownEncoding = errors.New("unknown Message encoding")

ErrUnknownEncoding specifies that the Message is not an event or it is encoded with an unknown encoding

Functions

func GetOrDefaultFromCtx

func GetOrDefaultFromCtx(ctx context.Context, key interface{}, def interface{}) interface{}

GetOrDefaultFromCtx gets a configuration value from the provided context

func ToEvent

func ToEvent(ctx context.Context, message MessageReader, transformers ...Transformer) (*event.Event, error)

ToEvent translates a Message with a valid Structured or Binary representation to an Event. This function returns the Event generated from the Message and the original encoding of the message or an error that points the conversion error. transformers can be nil and this function guarantees that they are invoked only once during the encoding process.

func ToEvents added in v2.14.0

func ToEvents(ctx context.Context, message MessageReader, body io.Reader) ([]event.Event, error)

ToEvents translates a Batch Message and corresponding Reader data to a slice of Events. This function returns the Events generated from the body data, or an error that points to the conversion issue.

func UseFormatForEvent

func UseFormatForEvent(ctx context.Context, f format.Format) context.Context

UseFormatForEvent configures which format to use when marshalling the event to structured mode

func WithForceBinary

func WithForceBinary(ctx context.Context) context.Context

WithForceBinary forces binary encoding during the encoding process

func WithForceStructured

func WithForceStructured(ctx context.Context) context.Context

WithForceStructured forces structured encoding during the encoding process

func WithPreferredEventEncoding

func WithPreferredEventEncoding(ctx context.Context, enc Encoding) context.Context

WithPreferredEventEncoding defines the preferred encoding from event to message during the encoding process

func WithSkipDirectBinaryEncoding

func WithSkipDirectBinaryEncoding(ctx context.Context, skip bool) context.Context

WithSkipDirectBinaryEncoding skips direct binary to binary encoding during the encoding process

func WithSkipDirectStructuredEncoding

func WithSkipDirectStructuredEncoding(ctx context.Context, skip bool) context.Context

WithSkipDirectStructuredEncoding skips direct structured to structured encoding during the encoding process

Types

type BinaryWriter

type BinaryWriter interface {
	MessageMetadataWriter

	// Method invoked at the beginning of the visit. Useful to perform initial memory allocations
	Start(ctx context.Context) error

	// SetData receives an io.Reader for the data attribute.
	// io.Reader is not invoked when the data attribute is empty
	SetData(data io.Reader) error

	// End method is invoked only after the whole encoding process ends successfully.
	// If it fails, it's never invoked. It can be used to finalize the message.
	End(ctx context.Context) error
}

BinaryWriter is used to visit a binary Message and generate a new representation.

Protocols that supports binary encoding should implement this interface to implement direct binary to binary encoding and event to binary encoding.

Start() and End() methods must be invoked by the caller of Message.ReadBinary() every time the BinaryWriter implementation is used to visit a Message.

type Encoding

type Encoding int

Encoding enum specifies the type of encodings supported by binding interfaces

const (
	// Binary encoding as specified in https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#message
	EncodingBinary Encoding = iota
	// Structured encoding as specified in https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#message
	EncodingStructured
	// Message is an instance of EventMessage or it contains EventMessage nested (through MessageWrapper)
	EncodingEvent
	// When the encoding is unknown (which means that the message is a non-event)
	EncodingUnknown

	// EncodingBatch is an instance of JSON Batched Events
	EncodingBatch
)

func DirectWrite

func DirectWrite(
	ctx context.Context,
	message MessageReader,
	structuredWriter StructuredWriter,
	binaryWriter BinaryWriter,
	transformers ...Transformer,
) (Encoding, error)

DirectWrite invokes the encoders. structuredWriter and binaryWriter could be nil if the protocol doesn't support it. transformers can be nil and this function guarantees that they are invoked only once during the encoding process. This function MUST be invoked only if message.ReadEncoding() == EncodingBinary or message.ReadEncoding() == EncodingStructured

Returns: * EncodingStructured, nil if message is correctly encoded in structured encoding * EncodingBinary, nil if message is correctly encoded in binary encoding * EncodingStructured, err if message was structured but error happened during the encoding * EncodingBinary, err if message was binary but error happened during the encoding * EncodingUnknown, ErrUnknownEncoding if message is not a structured or a binary Message

func Write

func Write(
	ctx context.Context,
	message MessageReader,
	structuredWriter StructuredWriter,
	binaryWriter BinaryWriter,
	transformers ...Transformer,
) (Encoding, error)

Write executes the full algorithm to encode a Message using transformers: 1. It first tries direct encoding using DirectWrite 2. If no direct encoding is possible, it uses ToEvent to generate an Event representation 3. From the Event, the message is encoded back to the provided structured or binary encoders You can tweak the encoding process using the context decorators WithForceStructured, WithForceStructured, etc. transformers can be nil and this function guarantees that they are invoked only once during the encoding process. Returns: * EncodingStructured, nil if message is correctly encoded in structured encoding * EncodingBinary, nil if message is correctly encoded in binary encoding * EncodingUnknown, ErrUnknownEncoding if message.ReadEncoding() == EncodingUnknown * _, err if error happened during the encoding

func (Encoding) String

func (e Encoding) String() string

type EventMessage

type EventMessage event.Event

EventMessage type-converts a event.Event object to implement Message. This allows local event.Event objects to be sent directly via Sender.Send()

s.Send(ctx, binding.EventMessage(e))

When an event is wrapped into a EventMessage, the original event could be potentially mutated. If you need to use the Event again, after wrapping it into an Event message, you should copy it before

func (*EventMessage) Finish

func (*EventMessage) Finish(error) error

func (*EventMessage) GetAttribute

func (m *EventMessage) GetAttribute(k spec.Kind) (spec.Attribute, interface{})

func (*EventMessage) GetExtension

func (m *EventMessage) GetExtension(name string) interface{}

func (*EventMessage) ReadBinary

func (m *EventMessage) ReadBinary(ctx context.Context, b BinaryWriter) (err error)

func (*EventMessage) ReadEncoding

func (m *EventMessage) ReadEncoding() Encoding

func (*EventMessage) ReadStructured

func (m *EventMessage) ReadStructured(ctx context.Context, builder StructuredWriter) error

type ExactlyOnceMessage

type ExactlyOnceMessage interface {
	Message

	// Received is called by a forwarding QoS2 Sender when it gets
	// acknowledgment of receipt (e.g. AMQP 'accept' or MQTT PUBREC)
	//
	// The receiver must call settle(nil) when it get's the ack-of-ack
	// (e.g. AMQP 'settle' or MQTT PUBCOMP) or settle(err) if the
	// transfer fails.
	//
	// Finally the Sender calls Finish() to indicate the message can be
	// discarded.
	//
	// If sending fails, or if the sender does not support QoS 2, then
	// Finish() may be called without any call to Received()
	Received(settle func(error))
}

ExactlyOnceMessage is implemented by received Messages that support QoS 2. Only transports that support QoS 2 need to implement or use this interface.

type Message

type Message interface {
	MessageReader

	// Finish *must* be called when message from a Receiver can be forgotten by
	// the receiver. A QoS 1 sender should not call Finish() until it gets an acknowledgment of
	// receipt on the underlying transport.  For QoS 2 see ExactlyOnceMessage.
	//
	// Note that, depending on the Message implementation, forgetting to Finish the message
	// could produce memory/resources leaks!
	//
	// Passing a non-nil err indicates sending or processing failed.
	// A non-nil return indicates that the message was not accepted
	// by the receivers peer.
	Finish(error) error
}

Message is the interface to a binding-specific message containing an event.

Reliable Delivery

There are 3 reliable qualities of service for messages:

0/at-most-once/unreliable: messages can be dropped silently.

1/at-least-once: messages are not dropped without signaling an error to the sender, but they may be duplicated in the event of a re-send.

2/exactly-once: messages are never dropped (without error) or duplicated, as long as both sending and receiving ends maintain some binding-specific delivery state. Whether this is persisted depends on the configuration of the binding implementations.

The Message interface supports QoS 0 and 1, the ExactlyOnceMessage interface supports QoS 2

Message includes the MessageReader interface to read messages. Every binding.Message implementation *must* specify if the message can be accessed one or more times.

When a Message can be forgotten by the entity who produced the message, Message.Finish() *must* be invoked.

func ToMessage

func ToMessage(e *event.Event) Message

func UnwrapMessage

func UnwrapMessage(message Message) Message

func WithFinish

func WithFinish(m Message, finish func(error)) Message

WithFinish returns a wrapper for m that calls finish() and m.Finish() in its Finish(). Allows code to be notified when a message is Finished.

type MessageContext added in v2.4.0

type MessageContext interface {
	// Get the context associated with this message
	Context() context.Context
}

MessageContext interface exposes the internal context that a message might contain Only some Message implementations implement this interface.

type MessageMetadataReader

type MessageMetadataReader interface {
	// GetAttribute returns:
	//
	// * attribute, value: if the message contains an attribute of that attribute kind
	// * attribute, nil: if the message spec version supports the attribute kind, but doesn't have any value
	// * nil, nil: if the message spec version doesn't support the attribute kind
	GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{})
	// GetExtension returns the value of that extension, if any.
	GetExtension(name string) interface{}
}

MessageMetadataReader defines how to read metadata from a binary/event message

If a message implementing MessageReader is encoded as binary (MessageReader.ReadEncoding() == EncodingBinary) or it's an EventMessage, then it's safe to assume that it also implements this interface

type MessageMetadataWriter

type MessageMetadataWriter interface {
	// Set a standard attribute.
	//
	// The value can either be the correct golang type for the attribute, or a canonical
	// string encoding, or nil. If value is nil, then the attribute should be deleted.
	// See package types to perform the needed conversions.
	SetAttribute(attribute spec.Attribute, value interface{}) error

	// Set an extension attribute.
	//
	// The value can either be the correct golang type for the attribute, or a canonical
	// string encoding, or nil. If value is nil, then the extension should be deleted.
	// See package types to perform the needed conversions.
	SetExtension(name string, value interface{}) error
}

MessageMetadataWriter is used to set metadata when a binary Message is visited.

type MessageReader

type MessageReader interface {
	// Return the type of the message Encoding.
	// The encoding should be preferably computed when the message is constructed.
	ReadEncoding() Encoding

	// ReadStructured transfers a structured-mode event to a StructuredWriter.
	// It must return ErrNotStructured if message is not in structured mode.
	//
	// Returns a different err if something wrong happened while trying to read the structured event.
	// In this case, the caller must Finish the message with appropriate error.
	//
	// This allows Senders to avoid re-encoding messages that are
	// already in suitable structured form.
	ReadStructured(context.Context, StructuredWriter) error

	// ReadBinary transfers a binary-mode event to an BinaryWriter.
	// It must return ErrNotBinary if message is not in binary mode.
	//
	// The implementation of ReadBinary must not control the lifecycle with BinaryWriter.Start() and BinaryWriter.End(),
	// because the caller must control the lifecycle.
	//
	// Returns a different err if something wrong happened while trying to read the binary event
	// In this case, the caller must Finish the message with appropriate error
	//
	// This allows Senders to avoid re-encoding messages that are
	// already in suitable binary form.
	ReadBinary(context.Context, BinaryWriter) error
}

MessageReader defines the read-related portion of the Message interface.

The ReadStructured and ReadBinary methods allows to perform an optimized encoding of a Message to a specific data structure.

If MessageReader.ReadEncoding() can be equal to EncodingBinary, then the implementation of MessageReader MUST also implement MessageMetadataReader.

A Sender should try each method of interest and fall back to binding.ToEvent() if none are supported. An out of the box algorithm is provided for writing a message: binding.Write().

type MessageWrapper

type MessageWrapper interface {
	Message
	MessageMetadataReader

	// Method to get the wrapped message
	GetWrappedMessage() Message
}

MessageWrapper interface is used to walk through a decorated Message and unwrap it.

type StructuredWriter

type StructuredWriter interface {
	// Event receives an io.Reader for the whole event.
	SetStructuredEvent(ctx context.Context, format format.Format, event io.Reader) error
}

StructuredWriter is used to visit a structured Message and generate a new representation.

Protocols that supports structured encoding should implement this interface to implement direct structured to structured encoding and event to structured encoding.

type Transformer

type Transformer interface {
	Transform(MessageMetadataReader, MessageMetadataWriter) error
}

Transformer is an interface that implements a transformation process while transferring the event from the Message implementation to the provided encoder

When a write function (binding.Write, binding.ToEvent, buffering.CopyMessage, etc.) takes Transformer(s) as parameter, it eventually converts the message to a form which correctly implements MessageMetadataReader, in order to guarantee that transformation is applied

type TransformerFunc

type TransformerFunc func(MessageMetadataReader, MessageMetadataWriter) error

TransformerFunc is a type alias to implement a Transformer through a function pointer

func (TransformerFunc) Transform

type Transformers

type Transformers []Transformer

Transformers is a utility alias to run several Transformer

func (Transformers) Transform

Directories

Path Synopsis
Package buffering provides APIs for buffered messages.
Package buffering provides APIs for buffered messages.
Package format formats structured events.
Package format formats structured events.
Package spec provides spec-version metadata.
Package spec provides spec-version metadata.
Package test provides utilities to test binding implementations and transformers.
Package test provides utilities to test binding implementations and transformers.
Package transformer provides methods for creating event message transformers.
Package transformer provides methods for creating event message transformers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL