Versions in this module Expand all Collapse all v0 v0.1.1 Apr 30, 2024 Changes in this version type FileChannel + func (fc *FileChannel) DiskUsage() (uint64, error) + func (fc *FileChannel) FlushOffset() uint64 + func (fc *FileChannel) WriteOffset() uint64 type Iterator + func (it *Iterator) Offset() uint64 v0.1.0 Jan 30, 2024 Changes in this version + const IteratorBufferLimit + const MessageHeaderBinarySize + const SegmentHeaderBinarySize + var DefaultFlushInterval = 100 * time.Microsecond + var DefaultRotateThreshold = uint64(512 << 20) + var ErrChannelClosed = errors.New("channel closed") + var ErrChecksumMismatch = errors.New("channel corrupted: checksum mismatch") + var ErrNotEnoughMessages = errors.New("not enough messages") + var ErrNotEnoughReadToAck = errors.New("not enough read to ack") + var ErrUnexpectedEOF = fmt.Errorf("unexpected EOF") + func ReadNext(r io.Reader, w io.Writer, hBuf []byte) error + type CompressedSegmentHeader struct + BeginOffset uint64 + CompressionMethod CompressionMethod + EndOffset uint64 + SegmentID uint32 + func (h *CompressedSegmentHeader) Decode(b []byte) + func (h *CompressedSegmentHeader) Encode(b []byte) + type CompressionMethod byte + const Snappy + type FileChannel struct + func NewFileChannel(dir string, opts ...Option) *FileChannel + func OpenFileChannel(dir string, opts ...Option) (*FileChannel, error) + func (fc *FileChannel) Close() error + func (fc *FileChannel) Flush() error + func (fc *FileChannel) Iterator() *Iterator + func (fc *FileChannel) IteratorAcknowledgable() *Iterator + func (fc *FileChannel) Open() error + func (fc *FileChannel) Write(p []byte) (err error) + type Iterator struct + func NewIterator(manager *SegmentManager, position *Position, autoAck bool) *Iterator + func (it *Iterator) Ack(n int) error + func (it *Iterator) Close() error + func (it *Iterator) Next(ctx context.Context) (b []byte, err error) + func (it *Iterator) TryNext() ([]byte, error) + type MessageHeader struct + Checksum uint32 + Length uint32 + func (h *MessageHeader) Decode(b []byte) + func (h *MessageHeader) Encode(b []byte) + type Option func(*FileChannel) + func FlushInterval(d time.Duration) Option + func RotateThreshold(n uint64) Option + type PlainSegmentHeader struct + BeginOffset uint64 + SegmentID uint32 + func (h *PlainSegmentHeader) Decode(b []byte) + func (h *PlainSegmentHeader) Encode(b []byte) + type Position struct + func NewPosition(offset uint64) *Position + func (p *Position) Close() + func (p *Position) Get() uint64 + func (p *Position) Update(offset uint64) uint64 + func (p *Position) Wait(ctx context.Context, cond func(uint64) bool) (uint64, error) + type SegmentFileState int + const Compressed + const Compressing + const Plain + func ParseSegmentIndexAndState(file string) (uint32, SegmentFileState, error) + type SegmentManager struct + func NewSegmentManager(dir string) *SegmentManager + func (sm *SegmentManager) AdvanceReader(prev uint32, delta uint32) (uint32, uint32) + func (sm *SegmentManager) CloseReader(cur uint32) uint32 + func (sm *SegmentManager) CurrentSegmentIndex() uint32 + func (sm *SegmentManager) CurrentSegmentWatermark() uint32 + func (sm *SegmentManager) GetBeginIndex() uint32 + func (sm *SegmentManager) IncSegmentIndex() uint32 + func (sm *SegmentManager) NewReader() uint32 + func (sm *SegmentManager) Pin(index uint32) bool + func (sm *SegmentManager) SegmentFile(index uint32, state SegmentFileState) string + func (sm *SegmentManager) SetBeginIndex(index uint32) + func (sm *SegmentManager) Unpin(index uint32) + func (sm *SegmentManager) WaitUntilWatermarkAbove(ctx context.Context, index uint32) (uint32, error)