Versions in this module Expand all Collapse all v0 v0.1.1 Jul 1, 2019 Changes in this version + var ErrEntryNotFound = errors.New("entry not found") + var ErrIndexCorrupt = errors.New("corrupt index file") + var ErrSegmentClosed = errors.New("segment has been closed") + var ErrSegmentExists = errors.New("segment already exists") + var ErrSegmentNotFound = errors.New("segment not found") + var ErrSegmentReplaced = errors.New("segment was replaced") + func NewMessageSetFromProto(baseOffset, basePos int64, msgs []*proto.Message) (MessageSet, []*Entry, error) + type CommitLog struct + func New(opts Options) (*CommitLog, error) + func (l *CommitLog) Append(msgs []*proto.Message) ([]int64, error) + func (l *CommitLog) AppendMessageSet(ms []byte) ([]int64, error) + func (l *CommitLog) Clean() error + func (l *CommitLog) Close() error + func (l *CommitLog) Delete() error + func (l *CommitLog) HighWatermark() int64 + func (l *CommitLog) NewReader(offset int64, uncommitted bool) (*Reader, error) + func (l *CommitLog) NewestOffset() int64 + func (l *CommitLog) OffsetForTimestamp(timestamp int64) (int64, error) + func (l *CommitLog) OldestOffset() int64 + func (l *CommitLog) Segments() []*Segment + func (l *CommitLog) SetHighWatermark(hw int64) + func (l *CommitLog) Truncate(offset int64) error + type CompactCleaner struct + func NewCompactCleaner(opts CompactCleanerOptions) *CompactCleaner + func (c *CompactCleaner) Compact(hw int64, segments []*Segment) ([]*Segment, error) + type CompactCleanerOptions struct + Logger logger.Logger + MaxGoroutines int + Name string + type DeleteCleaner struct + func NewDeleteCleaner(opts DeleteCleanerOptions) *DeleteCleaner + func (c *DeleteCleaner) Clean(segments []*Segment) ([]*Segment, error) + type DeleteCleanerOptions struct + Logger logger.Logger + Name string + Retention struct{ ... } + type Entry struct + Offset int64 + Position int64 + Size int32 + Timestamp int64 + func EntriesForMessageSet(basePos int64, ms []byte) []*Entry + type Index struct + func NewIndex(opts options) (idx *Index, err error) + func (idx *Index) Close() error + func (idx *Index) CountEntries() int64 + func (idx *Index) InitializePosition() (*Entry, error) + func (idx *Index) Name() string + func (idx *Index) Position() int64 + func (idx *Index) ReadAt(p []byte, offset int64) (n int, err error) + func (idx *Index) ReadEntryAtFileOffset(e *Entry, fileOffset int64) (err error) + func (idx *Index) ReadEntryAtLogOffset(e *Entry, logOffset int64) error + func (idx *Index) Shrink() error + func (idx *Index) Sync() error + func (idx *Index) TruncateEntries(number int) error + type IndexScanner struct + func NewIndexScanner(idx *Index) *IndexScanner + func (s *IndexScanner) Scan() (*Entry, error) + type Message []byte + func (m Message) Attributes() int8 + func (m Message) Crc() uint32 + func (m Message) Headers() map[string][]byte + func (m Message) Key() []byte + func (m Message) MagicByte() int8 + func (m Message) Value() []byte + type MessageSet []byte + func (ms MessageSet) Message() Message + func (ms MessageSet) Offset() int64 + func (ms MessageSet) Size() int32 + func (ms MessageSet) Timestamp() int64 + type Options struct + CleanerInterval time.Duration + Compact bool + CompactMaxGoroutines int + HWCheckpointInterval time.Duration + LogRollTime time.Duration + Logger logger.Logger + MaxLogAge time.Duration + MaxLogBytes int64 + MaxLogMessages int64 + MaxSegmentBytes int64 + Path string + type Reader struct + func (r *Reader) ReadMessage(ctx context.Context, headersBuf []byte) (Message, int64, int64, error) + type Segment struct + BaseOffset int64 + Index *Index + func NewSegment(path string, baseOffset, maxBytes int64, isNew bool, suffix string) (*Segment, error) + func (s *Segment) CheckSplit(logRollTime time.Duration) bool + func (s *Segment) Cleaned() (*Segment, error) + func (s *Segment) Close() error + func (s *Segment) Delete() error + func (s *Segment) FirstOffset() int64 + func (s *Segment) IsEmpty() bool + func (s *Segment) LastOffset() int64 + func (s *Segment) MessageCount() int64 + func (s *Segment) NextOffset() int64 + func (s *Segment) Position() int64 + func (s *Segment) ReadAt(p []byte, off int64) (n int, err error) + func (s *Segment) Replace(old *Segment) error + func (s *Segment) Seal() + func (s *Segment) Truncated() (*Segment, error) + func (s *Segment) WriteMessageSet(ms []byte, entries []*Entry) error + type SegmentScanner struct + func NewSegmentScanner(segment *Segment) *SegmentScanner + func (s *SegmentScanner) Scan() (MessageSet, *Entry, error)