Documentation ¶
Overview ¶
Package journal is an implementation and interface specification for an append-only journal with rotations. It contains a few simple implementations, as well.
Index ¶
- Variables
- func TSFromName(name string) (int64, error)
- type Bytes
- func (j Bytes) Append(rec interface{}) error
- func (j Bytes) Bytes() []byte
- func (j *Bytes) Close() error
- func (j *Bytes) IsOpen() bool
- func (j *Bytes) JournalDecoder() (Decoder, error)
- func (j *Bytes) SnapshotDecoder() (Decoder, error)
- func (j *Bytes) StartSnapshot(records <-chan interface{}, snapresp chan<- error) error
- func (j Bytes) String() string
- type Count
- func (j *Count) Append(_ interface{}) error
- func (j Count) Close() error
- func (j Count) IsOpen() bool
- func (j Count) JournalDecoder() (Decoder, error)
- func (j Count) ShardFinished() bool
- func (j Count) SnapshotDecoder() (Decoder, error)
- func (j Count) StartSnapshot(records <-chan interface{}, snapresp chan<- error) error
- func (j Count) String() string
- type Decoder
- type DiskLog
- func (d *DiskLog) Append(rec interface{}) error
- func (d *DiskLog) Close() error
- func (d *DiskLog) Dir() string
- func (d *DiskLog) IsOpen() bool
- func (d *DiskLog) JournalDecoder() (Decoder, error)
- func (d *DiskLog) JournalName() string
- func (d *DiskLog) Rotate() error
- func (d *DiskLog) SnapshotDecoder() (Decoder, error)
- func (d *DiskLog) StartSnapshot(elems <-chan interface{}, snapresp chan<- error) error
- type EmptyDecoder
- type FS
- type File
- type Interface
- type MemFS
- func (m *MemFS) Create(name string) (File, error)
- func (m *MemFS) FindMatching(glob string) ([]string, error)
- func (m *MemFS) Mkdir(name string, perm os.FileMode) error
- func (m *MemFS) MkdirAll(name string, perm os.FileMode) error
- func (m *MemFS) Open(name string) (File, error)
- func (m *MemFS) Remove(name string) error
- func (m *MemFS) RemoveAll(dirname string) error
- func (m *MemFS) Rename(oldname, newname string) error
- func (m *MemFS) Stat(name string) (os.FileInfo, error)
- func (m *MemFS) String() string
- type OSFS
- func (OSFS) Create(name string) (File, error)
- func (OSFS) FindMatching(glob string) ([]string, error)
- func (OSFS) Getpid() int
- func (OSFS) Mkdir(name string, perm os.FileMode) error
- func (OSFS) MkdirAll(name string, perm os.FileMode) error
- func (OSFS) Open(name string) (File, error)
- func (OSFS) Remove(name string) error
- func (OSFS) RemoveAll(dirname string) error
- func (OSFS) Rename(oldname, newname string) error
- func (OSFS) Stat(name string) (os.FileInfo, error)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotOpen = errors.New("journal is not open") // Logf is a function used to log warnings, etc. It can be overridden for // e.g., testing log output. Logf = func(fstr string, vals ...interface{}) { log.Printf(fstr, vals...) } )
var (
ErrCantClose = errors.New("cannot close this type of journal")
)
Functions ¶
func TSFromName ¶
TSFromName gets a timestamp from the file name (it's a prefix).
Types ¶
type Bytes ¶
type Bytes struct {
// contains filtered or unexported fields
}
func (*Bytes) JournalDecoder ¶
func (*Bytes) SnapshotDecoder ¶
func (*Bytes) StartSnapshot ¶
type Count ¶
type Count int64
func (Count) JournalDecoder ¶
func (Count) ShardFinished ¶
func (Count) SnapshotDecoder ¶
func (Count) StartSnapshot ¶
type Decoder ¶
type Decoder interface { // Decode attempts to fill the elements of the underlying value of its // argument with the next item. Decode(interface{}) error }
type DiskLog ¶
type DiskLog struct {
// contains filtered or unexported fields
}
Example ¶
// Open up the log in directory "/tmp/disklog". Will create an error if it does not exist. fs := NewMemFS("/tmp/disklog") journal, err := OpenDiskLogInjectFS("/tmp/disklog", fs) if err != nil { fmt.Println(err) return } // Data type can be anything. Here we're adding integers one at a time. We // could also add the entire list at once, since it just gets gob-encoded. data := []int{2, 3, 5, 7, 11, 13} for _, d := range data { if err := journal.Append(d); err != nil { fmt.Printf("Failed to append %v: %v\n", d, err) } } // We didn't write enough to trigger a rotation, so everything should be in // the current journal. Let's see if we get it back. decoder, err := journal.JournalDecoder() if err != nil { fmt.Printf("error getting decoder: %v\n", err) return } vals := make([]int, 0) val := -1 for { err := decoder.Decode(&val) if err == io.EOF { break } if err != nil { fmt.Println("Error:", vals) fmt.Printf("failed to decode next item in journal: %v\n", err) return } vals = append(vals, val) } fmt.Println("Success", vals)
Output: Success [2 3 5 7 11 13]
func OpenDiskLog ¶
func (*DiskLog) Close ¶
Close gracefully shuts the journal down, finalizing the current journal log.
func (*DiskLog) JournalDecoder ¶
JournalDecoder returns a Decoder whose Decode function can be called to get the next item from the journals that are newer than the most recent snapshot.
func (*DiskLog) JournalName ¶
Return the current journal name.
func (*DiskLog) SnapshotDecoder ¶
SnapshotDecoder returns a decoder whose Decode function can be called to get the next item from the most recent frozen snapshot.
func (*DiskLog) StartSnapshot ¶
StartSnapshot triggers an immediate rotation, then consumes all of the elements on the channel and serializing them to a snapshot file with the same ID as the recently-closed log.
type EmptyDecoder ¶
type EmptyDecoder struct{}
EmptyDecoder can be returned when there is nothing to decode, but it is safe to proceed.
func (EmptyDecoder) Decode ¶
func (ed EmptyDecoder) Decode(interface{}) error
Decode with no elements - default behavior.
type FS ¶
type FS interface { Create(name string) (File, error) Open(name string) (File, error) Rename(oldname, newname string) error Remove(name string) error RemoveAll(dirname string) error Mkdir(name string, perm os.FileMode) error MkdirAll(name string, perm os.FileMode) error Stat(name string) (os.FileInfo, error) FindMatching(glob string) ([]string, error) }
type Interface ¶
type Interface interface { // Append appends a serialized version of the interface to the // current journal shard. Append(interface{}) error // StartSnapshot is given a data channel from which it is expected to // consume all values until closed. If it terminates early, it sends a // non-nil error back. When complete with no errors, the snapshot has been // successfully processed. Whether the current shard is full or not, this // function immediately trigger a shard rotation so that subsequent calls // to Append go to a new shard. StartSnapshot(records <-chan interface{}, resp chan<- error) error // SnapshotDecoder returns a decode function that can be called to decode // the next element in the most recent snapshot. SnapshotDecoder() (Decoder, error) // JournalDecoder returns a decode function that can be called to decode // the next element in the journal stream. JournalDecoder() (Decoder, error) // Close allows the journal to shut down (e.g., flush data) gracefully. Close() error // IsOpen indicates whether the journal is operational and has not been // closed. IsOpen() bool }