circular

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2024 License: MPL-2.0 Imports: 17 Imported by: 1

README

go-circular

Package circular provides a buffer with circular semantics.

Design

The buffer is split into chunks, the last chunk is not compressed and being written to, and previous chunks are compressed and immutable.

The buffer keeps a pointer to the current offset being written which is always incremented, while the index of compressed chunks keeps the initial offset of each compressed chunk and its decompressed size.

If the compressed chunk is being read, it is decompressed on the fly by the Reader.

Documentation

Overview

Package circular provides a buffer with circular semantics.

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("reader is closed")

ErrClosed is raised on read from closed Reader.

View Source
var ErrOutOfSync = errors.New("buffer overrun, read position overwritten")

ErrOutOfSync is raised when reader got too much out of sync with the writer.

View Source
var ErrSeekBeforeStart = errors.New("seek before start")

ErrSeekBeforeStart is raised when seek goes beyond start of the file.

Functions

This section is empty.

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer implements circular buffer with a thread-safe writer, that supports multiple readers each with its own offset.

func NewBuffer

func NewBuffer(opts ...OptionFunc) (*Buffer, error)

NewBuffer creates new Buffer with specified options.

func (*Buffer) Capacity

func (buf *Buffer) Capacity() int

Capacity returns number of bytes allocated for the buffer.

func (*Buffer) Close added in v0.2.0

func (buf *Buffer) Close() error

Close closes the buffer and waits for persistence goroutine to finish.

func (*Buffer) GetReader

func (buf *Buffer) GetReader() *Reader

GetReader returns Reader object which implements io.ReadCloser, io.Seeker.

Reader starts at the most distant position in the past available and goes to the current write position.

func (*Buffer) GetStreamingReader

func (buf *Buffer) GetStreamingReader() *Reader

GetStreamingReader returns Reader object which implements io.ReadCloser, io.Seeker.

StreamingReader starts at the most distant position in the past available.

func (*Buffer) MaxCapacity added in v0.2.0

func (buf *Buffer) MaxCapacity() int

MaxCapacity returns maximum number of (decompressed) bytes (including compressed chunks) that can be stored in the buffer.

func (*Buffer) NumCompressedChunks added in v0.2.0

func (buf *Buffer) NumCompressedChunks() int

NumCompressedChunks returns number of compressed chunks.

func (*Buffer) Offset

func (buf *Buffer) Offset() int64

Offset returns current write offset (number of bytes written).

func (*Buffer) TotalCompressedSize added in v0.2.0

func (buf *Buffer) TotalCompressedSize() int64

TotalCompressedSize reports the overall memory used by the circular buffer including compressed chunks.

func (*Buffer) TotalSize added in v0.2.0

func (buf *Buffer) TotalSize() int64

TotalSize reports overall number of bytes available for reading in the buffer.

TotalSize might be higher than TotalCompressedSize, because compressed chunks take less memory than uncompressed data.

func (*Buffer) Write

func (buf *Buffer) Write(p []byte) (int, error)

Write implements io.Writer interface.

type Compressor added in v0.2.0

type Compressor interface {
	Compress(src, dest []byte) ([]byte, error)
	Decompress(src, dest []byte) ([]byte, error)
	DecompressedSize(src []byte) (int64, error)
}

Compressor implements an optional interface for chunk compression.

Compress and Decompress append to the dest slice and return the result.

Compressor should be safe for concurrent use by multiple goroutines. Compressor should verify checksums of the compressed data.

type OptionFunc

type OptionFunc func(*Options) error

OptionFunc allows setting Buffer options.

func WithInitialCapacity

func WithInitialCapacity(capacity int) OptionFunc

WithInitialCapacity sets initial buffer capacity.

func WithLogger added in v0.2.0

func WithLogger(logger *zap.Logger) OptionFunc

WithLogger sets logger for Buffer.

func WithMaxCapacity

func WithMaxCapacity(capacity int) OptionFunc

WithMaxCapacity sets maximum buffer capacity.

func WithNumCompressedChunks added in v0.2.0

func WithNumCompressedChunks(num int, c Compressor) OptionFunc

WithNumCompressedChunks sets number of compressed chunks to keep in the buffer.

Default is to keep no compressed chunks, only uncompressed circular buffer is used.

func WithPersistence added in v0.2.0

func WithPersistence(options PersistenceOptions) OptionFunc

WithPersistence enables buffer persistence to disk.

func WithSafetyGap

func WithSafetyGap(gap int) OptionFunc

WithSafetyGap sets safety gap between readers and writers to avoid buffer overrun for the reader.

Reader initial position is set to be as far as possible in the buffer history, but next concurrent write might overwrite read position, and safety gap helps to prevent it. With safety gap, maximum available bytes to read are: MaxCapacity-SafetyGap.

type Options

type Options struct {
	Compressor Compressor

	Logger *zap.Logger

	PersistenceOptions PersistenceOptions

	InitialCapacity int
	MaxCapacity     int
	SafetyGap       int

	NumCompressedChunks int
}

Options defines settings for Buffer.

type PersistenceOptions added in v0.2.0

type PersistenceOptions struct {
	// ChunkPath is the base path to the store chunk files.
	//
	// Example: /var/log/machine/my-machine.log, chunks will be stored
	// by appending a chunk ID to this path, e.g. /var/log/machine/my-machine.log.3.
	//
	// If ChunkPath is empty, persistence is disabled.
	ChunkPath string

	// FlushInterval flushes buffer content to disk every FlushInterval (if there were any changes).
	FlushInterval time.Duration

	// FlushJitter adds random jitter to FlushInterval to avoid thundering herd problem (a ratio of FlushInterval).
	FlushJitter float64
}

PersistenceOptions defines settings for Buffer persistence.

func (PersistenceOptions) NextInterval added in v0.2.0

func (p PersistenceOptions) NextInterval() time.Duration

NextInterval calculates next flush interval with jitter.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader implements seekable reader with local position in the Buffer which reads from the fixed part of the buffer, or performs streaming reads.

Reader is not safe to be used with concurrent Read/Seek operations.

func (*Reader) Close

func (r *Reader) Close() error

Close implements io.Closer.

func (*Reader) Read

func (r *Reader) Read(p []byte) (n int, err error)

Read implements io.Reader.

func (*Reader) Seek

func (r *Reader) Seek(offset int64, whence int) (int64, error)

Seek implements io.Seeker.

type StreamingReader

type StreamingReader = Reader

StreamingReader is a backwards compatible type alias.

Directories

Path Synopsis
Package zstd compression and decompression functions.
Package zstd compression and decompression functions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL