buffer

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2018 License: MIT Imports: 11 Imported by: 8

Documentation

Overview

Package buffer links an input and an output together and buffers messages in between.

Index

Constants

This section is empty.

Variables

View Source
var Constructors = map[string]TypeSpec{}

Constructors is a map of all buffer types with their specs.

Functions

func Descriptions

func Descriptions() string

Descriptions returns a formatted string of collated descriptions of each type.

func SanitiseConfig added in v0.8.4

func SanitiseConfig(conf Config) (interface{}, error)

SanitiseConfig returns a sanitised version of the Config, meaning sections that aren't relevant to behaviour are removed.

Types

type Config

type Config struct {
	Type   string                      `json:"type" yaml:"type"`
	Memory sequential.MemoryConfig     `json:"memory" yaml:"memory"`
	Mmap   sequential.MmapBufferConfig `json:"mmap_file" yaml:"mmap_file"`
	None   struct{}                    `json:"none" yaml:"none"`
}

Config is the all encompassing configuration struct for all input types.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values.

type Empty

type Empty struct {
	// contains filtered or unexported fields
}

Empty is an empty buffer, simply forwards messages on directly.

func (*Empty) CloseAsync

func (e *Empty) CloseAsync()

CloseAsync shuts down the StackBuffer output and stops processing messages.

func (*Empty) ErrorsChan

func (e *Empty) ErrorsChan() <-chan []error

ErrorsChan returns the errors channel.

func (*Empty) StartReceiving

func (e *Empty) StartReceiving(msgs <-chan types.Transaction) error

StartReceiving assigns a messages channel for the output to read.

func (*Empty) TransactionChan added in v0.9.0

func (e *Empty) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this input.

func (*Empty) WaitForClose

func (e *Empty) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the StackBuffer output has closed down.

type Sequential added in v0.10.0

type Sequential interface {
	// ShiftMessage removes the oldest message from the stack. Returns the
	// backlog in bytes.
	ShiftMessage() (int, error)

	// NextMessage reads the oldest message, the message is preserved until
	// ShiftMessage is called.
	NextMessage() (types.Message, error)

	// PushMessage adds a new message to the stack. Returns the backlog in
	// bytes.
	PushMessage(types.Message) (int, error)

	// CloseOnceEmpty closes the Buffer once the buffer has been emptied, this
	// is a way for a writer to signal to a reader that it is finished writing
	// messages, and therefore the reader can close once it is caught up. This
	// call blocks until the close is completed.
	CloseOnceEmpty()

	// Close closes the Buffer so that blocked readers or writers become
	// unblocked.
	Close()
}

Sequential represents a method of buffering sequential messages, supporting only a single consumer.

type SequentialWrapper added in v0.10.0

type SequentialWrapper struct {
	// contains filtered or unexported fields
}

SequentialWrapper wraps a buffer with a Producer/Consumer interface.

func (*SequentialWrapper) CloseAsync added in v0.10.0

func (m *SequentialWrapper) CloseAsync()

CloseAsync shuts down the SequentialWrapper and stops processing messages.

func (*SequentialWrapper) StartReceiving added in v0.10.0

func (m *SequentialWrapper) StartReceiving(msgs <-chan types.Transaction) error

StartReceiving assigns a messages channel for the output to read.

func (*SequentialWrapper) TransactionChan added in v0.10.0

func (m *SequentialWrapper) TransactionChan() <-chan types.Transaction

TransactionChan returns the channel used for consuming messages from this buffer.

func (*SequentialWrapper) WaitForClose added in v0.10.0

func (m *SequentialWrapper) WaitForClose(timeout time.Duration) error

WaitForClose blocks until the SequentialWrapper output has closed down.

type Type

type Type interface {
	types.Producer
	types.Consumer
	types.Closable
}

Type is the standard interface of an agent type.

func New added in v0.0.2

func New(conf Config, log log.Modular, stats metrics.Type) (Type, error)

New creates an input type based on an input configuration.

func NewEmpty

func NewEmpty(config Config, log log.Modular, stats metrics.Type) (Type, error)

NewEmpty creates a new buffer interface but doesn't buffer messages.

func NewMemory

func NewMemory(config Config, log log.Modular, stats metrics.Type) (Type, error)

NewMemory - Create a buffer held in memory.

func NewMmapFile

func NewMmapFile(config Config, log log.Modular, stats metrics.Type) (Type, error)

NewMmapFile creates a buffer held in memory and persisted to file through memory map.

func NewSequentialWrapper added in v0.10.0

func NewSequentialWrapper(
	conf Config,
	buffer Sequential,
	log log.Modular,
	stats metrics.Type,
) Type

NewSequentialWrapper creates a new Producer/Consumer around a buffer.

type TypeSpec added in v0.9.1

type TypeSpec struct {
	// contains filtered or unexported fields
}

TypeSpec is a constructor and usage description for each buffer type.

Directories

Path Synopsis
Package sequential contains implementations of various buffer types where the buffer can only be consumed once in a set sequence.
Package sequential contains implementations of various buffer types where the buffer can only be consumed once in a set sequence.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL