commitlog

package
v1.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2018 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

* Package commitlog implements the append-only on-disk persistency layey.

CommitLog is the on-disk persistency layer for Slait. It is a simple append-only style format aiming the best performance for the use case. The idea is borrowed from Kafka (and Jocko), but we further simpliy it to remove the index file assuming the data is cached in the main memory.

Physical layout

We assume every entry has timestamp and enforce entries to be ordered by the time in ascending order. Each record is encoded as follows.

- byte 0-7: timestamp of the record in Unix epoch nano seconds - byte 8-11: the size of the payload - byte 12-: payload byte array

The byte 0-7 and 8-11 are encoded in little endian. There is no padding in between records. Since the timestamp is encoded by Unix epoch nanoseconds, the maximum value for the timestamp is somewhere around the year 2262. The timezone info will not be considered in the physical layout, and the restored time is always in UTC timezone.

Segment files

A partition is split into segment files. When the newest segment is full, new record is written into a new segment file. A segment file is named after the base nanosecond from the first record in the file. When a file is trimmed, the deletion happens only at the segment level. The maximum file size of the segment files are configured by the caller.

The module does not have any concurrency protection. The caller should take the appropriate action on use of this.

Index

Constants

View Source
const (
	LogFileSuffix = ".log"
)

Variables

View Source
var (
	ErrSegmentNotFound = errors.New("segment not found")
	Encoding           = binary.LittleEndian
)

Functions

This section is empty.

Types

type ByteSizeCleaner

type ByteSizeCleaner struct {
	// Options["MaxLogBytes"] should be number string accepted by strconv.Atoi()
	Options CleanerOptions
	// -1 to avoid any deletes
	MaxLogBytes int64
}

ByteSizeCleaner deletes leading segment files based on the total byte size of segments

func (*ByteSizeCleaner) Clean

func (cleaner *ByteSizeCleaner) Clean(segments []*Segment) ([]*Segment, error)

Clean deletes segment files so that the sum of the segment files in bytes are less than maxLogBytes. It keeps at least one segment file if there are any.

type Cleaner

type Cleaner interface {
	Clean([]*Segment) ([]*Segment, error)
}

func NewCleaner

func NewCleaner(options CleanerOptions) Cleaner

type CleanerOptions

type CleanerOptions map[string]string

type CommitLog

type CommitLog struct {
	Options
	// contains filtered or unexported fields
}

func New

func New(opts Options) (*CommitLog, error)

func (*CommitLog) Append

func (l *CommitLog) Append(entry *Entry) error

func (*CommitLog) Close

func (l *CommitLog) Close() error

func (*CommitLog) DeleteAll

func (l *CommitLog) DeleteAll() error

func (*CommitLog) Segments

func (l *CommitLog) Segments() []*Segment

func (*CommitLog) Tell

func (l *CommitLog) Tell() *position

Tell tells the current logical position.

func (*CommitLog) Trim

func (l *CommitLog) Trim() (time.Time, error)

Trim deletes segment files according to its retention policy. Returns the base nanosec time of the first segment if some files have been deleted. A zero time is returned if nothing has changed. Maximum nano sec time (= 2262-04-11 23:47:16.854775807 +0000 UTC) is returned if all the segments are deleted.

func (*CommitLog) Truncate

func (l *CommitLog) Truncate(backTo *position) error

type DurationCleaner

type DurationCleaner struct {
	// Options["Duration"] should be one of the format accepted by time.ParseDuration
	Options  CleanerOptions
	Duration time.Duration
}

DurationCleaner deletes leading segments based on -1 * duration from time.Now()

func (*DurationCleaner) Clean

func (cleaner *DurationCleaner) Clean(segments []*Segment) ([]*Segment, error)

type Entry

type Entry struct {
	Timestamp time.Time
	Data      []byte
}

type Options

type Options struct {
	Path            string
	MaxSegmentBytes int64
	CleanerOptions  CleanerOptions
}

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader(path string) (*Reader, error)

func (*Reader) Close

func (r *Reader) Close() error

func (*Reader) Read

func (r *Reader) Read() (*Entry, error)

type Record

type Record []byte

func NewRecord

func NewRecord(nanosec int64, payload []byte) Record

type Segment

type Segment struct {
	BaseNano int64
	Size     int64
	// contains filtered or unexported fields
}

func NewSegment

func NewSegment(path string, baseNano int64, maxBytes int64) (*Segment, error)

func (*Segment) AppendEntry

func (s *Segment) AppendEntry(entry *Entry) error

func (*Segment) Close

func (s *Segment) Close() error

func (*Segment) Delete

func (s *Segment) Delete() error

func (*Segment) IsFull

func (s *Segment) IsFull() bool

func (*Segment) ReadEntry

func (s *Segment) ReadEntry() (*Entry, error)

func (*Segment) Truncate

func (s *Segment) Truncate(size int64) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL