|
|
|
@ -3,6 +3,7 @@ package tsdb
|
|
|
|
|
import ( |
|
|
|
|
"bufio" |
|
|
|
|
"encoding/binary" |
|
|
|
|
"hash" |
|
|
|
|
"hash/crc32" |
|
|
|
|
"io" |
|
|
|
|
"math" |
|
|
|
@ -20,11 +21,19 @@ import (
|
|
|
|
|
// WALEntryType indicates what data a WAL entry contains.
|
|
|
|
|
type WALEntryType byte |
|
|
|
|
|
|
|
|
|
// The valid WAL entry types.
|
|
|
|
|
const ( |
|
|
|
|
WALEntrySymbols = 1 |
|
|
|
|
WALEntrySeries = 2 |
|
|
|
|
WALEntrySamples = 3 |
|
|
|
|
// WALMagic is a 4 byte number every WAL segment file starts with.
|
|
|
|
|
WALMagic = uint32(0x43AF00EF) |
|
|
|
|
|
|
|
|
|
// WALFormatDefault is the version flag for the default outer segment file format.
|
|
|
|
|
WALFormatDefault = byte(1) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// Entry types in a segment file.
|
|
|
|
|
const ( |
|
|
|
|
WALEntrySymbols WALEntryType = 1 |
|
|
|
|
WALEntrySeries WALEntryType = 2 |
|
|
|
|
WALEntrySamples WALEntryType = 3 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// WAL is a write ahead log for series data. It can only be written to.
|
|
|
|
@ -32,102 +41,201 @@ const (
|
|
|
|
|
type WAL struct { |
|
|
|
|
mtx sync.Mutex |
|
|
|
|
|
|
|
|
|
f *fileutil.LockedFile |
|
|
|
|
enc *walEncoder |
|
|
|
|
dirFile *os.File |
|
|
|
|
files []*os.File |
|
|
|
|
|
|
|
|
|
logger log.Logger |
|
|
|
|
flushInterval time.Duration |
|
|
|
|
segmentSize int64 |
|
|
|
|
|
|
|
|
|
crc32 hash.Hash32 |
|
|
|
|
cur *bufio.Writer |
|
|
|
|
curN int64 |
|
|
|
|
|
|
|
|
|
stopc chan struct{} |
|
|
|
|
donec chan struct{} |
|
|
|
|
|
|
|
|
|
symbols map[string]uint32 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const walFileName = "wal-000" |
|
|
|
|
const ( |
|
|
|
|
walDirName = "wal" |
|
|
|
|
walSegmentSizeBytes = 64 * 1000 * 1000 // 64 MB
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// OpenWAL opens or creates a write ahead log in the given directory.
|
|
|
|
|
// The WAL must be read completely before new data is written.
|
|
|
|
|
func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error) { |
|
|
|
|
dir = filepath.Join(dir, walDirName) |
|
|
|
|
|
|
|
|
|
if err := os.MkdirAll(dir, 0777); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
p := filepath.Join(dir, walFileName) |
|
|
|
|
|
|
|
|
|
f, err := fileutil.TryLockFile(p, os.O_RDWR, 0666) |
|
|
|
|
if err != nil { |
|
|
|
|
if !os.IsNotExist(err) { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
f, err = fileutil.LockFile(p, os.O_RDWR|os.O_CREATE, 0666) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
if _, err = f.Seek(0, os.SEEK_END); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
enc, err := newWALEncoder(f.File) |
|
|
|
|
df, err := fileutil.OpenDir(dir) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
w := &WAL{ |
|
|
|
|
f: f, |
|
|
|
|
dirFile: df, |
|
|
|
|
logger: l, |
|
|
|
|
enc: enc, |
|
|
|
|
flushInterval: flushInterval, |
|
|
|
|
symbols: map[string]uint32{}, |
|
|
|
|
donec: make(chan struct{}), |
|
|
|
|
stopc: make(chan struct{}), |
|
|
|
|
segmentSize: walSegmentSizeBytes, |
|
|
|
|
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), |
|
|
|
|
} |
|
|
|
|
if err := w.initSegments(); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
go w.run(flushInterval) |
|
|
|
|
|
|
|
|
|
return w, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type walHandler struct { |
|
|
|
|
sample func(refdSample) error |
|
|
|
|
series func(labels.Labels) error |
|
|
|
|
// Reader returns a new reader over the the write ahead log data.
|
|
|
|
|
// It must be completely consumed before writing to the WAL.
|
|
|
|
|
func (w *WAL) Reader() *WALReader { |
|
|
|
|
var rs []io.ReadCloser |
|
|
|
|
for _, f := range w.files { |
|
|
|
|
rs = append(rs, f) |
|
|
|
|
} |
|
|
|
|
return NewWALReader(rs...) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ReadAll consumes all entries in the WAL and triggers the registered handlers.
|
|
|
|
|
func (w *WAL) ReadAll(h *walHandler) error { |
|
|
|
|
dec := &walDecoder{ |
|
|
|
|
r: w.f, |
|
|
|
|
handler: h, |
|
|
|
|
// Log writes a batch of new series labels and samples to the log.
|
|
|
|
|
func (w *WAL) Log(series []labels.Labels, samples []refdSample) error { |
|
|
|
|
if err := w.encodeSeries(series); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if err := w.encodeSamples(samples); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if w.flushInterval <= 0 { |
|
|
|
|
return w.Sync() |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
if err := dec.entry(); err != nil { |
|
|
|
|
if err == io.EOF { |
|
|
|
|
return nil |
|
|
|
|
// initSegments finds all existing segment files and opens them in the
|
|
|
|
|
// appropriate file modes.
|
|
|
|
|
func (w *WAL) initSegments() error { |
|
|
|
|
fns, err := sequenceFiles(w.dirFile.Name(), "") |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if len(fns) == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
if len(fns) > 1 { |
|
|
|
|
for _, fn := range fns[:len(fns)-1] { |
|
|
|
|
f, err := os.Open(fn) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
return err |
|
|
|
|
w.files = append(w.files, f) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// The most recent WAL file is the one we have to keep appending to.
|
|
|
|
|
f, err := os.OpenFile(fns[len(fns)-1], os.O_RDWR, 0666) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
w.files = append(w.files, f) |
|
|
|
|
|
|
|
|
|
// Consume and validate meta headers.
|
|
|
|
|
for _, f := range w.files { |
|
|
|
|
metab := make([]byte, 8) |
|
|
|
|
|
|
|
|
|
if n, err := f.Read(metab); err != nil { |
|
|
|
|
return errors.Wrapf(err, "validate meta %q", f.Name()) |
|
|
|
|
} else if n != 8 { |
|
|
|
|
return errors.Errorf("invalid header size %d in %q", n, f.Name()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if m := binary.BigEndian.Uint32(metab[:4]); m != WALMagic { |
|
|
|
|
return errors.Errorf("invalid magic header %x in %q", m, f.Name()) |
|
|
|
|
} |
|
|
|
|
if metab[4] != WALFormatDefault { |
|
|
|
|
return errors.Errorf("unknown WAL segment format %d in %q", metab[4], f.Name()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Log writes a batch of new series labels and samples to the log.
|
|
|
|
|
func (w *WAL) Log(series []labels.Labels, samples []refdSample) error { |
|
|
|
|
if err := w.enc.encodeSeries(series); err != nil { |
|
|
|
|
// cut finishes the currently active segments and open the next one.
|
|
|
|
|
// The encoder is reset to point to the new segment.
|
|
|
|
|
func (w *WAL) cut() error { |
|
|
|
|
// Sync current tail to disc and close.
|
|
|
|
|
if tf := w.tail(); tf != nil { |
|
|
|
|
if err := w.sync(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
off, err := tf.Seek(0, os.SEEK_CUR) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if err := tf.Truncate(off); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if err := tf.Close(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
p, _, err := nextSequenceFile(w.dirFile.Name(), "") |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if err := w.enc.encodeSamples(samples); err != nil { |
|
|
|
|
f, err := os.Create(p) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if w.flushInterval <= 0 { |
|
|
|
|
return w.sync() |
|
|
|
|
if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if err = w.dirFile.Sync(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Write header metadata for new file.
|
|
|
|
|
metab := make([]byte, 8) |
|
|
|
|
binary.BigEndian.PutUint32(metab[:4], WALMagic) |
|
|
|
|
metab[4] = WALFormatDefault |
|
|
|
|
|
|
|
|
|
if _, err := f.Write(metab); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
w.files = append(w.files, f) |
|
|
|
|
w.cur = bufio.NewWriterSize(f, 4*1024*1024) |
|
|
|
|
w.curN = 8 |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *WAL) tail() *os.File { |
|
|
|
|
if len(w.files) == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
return w.files[len(w.files)-1] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *WAL) Sync() error { |
|
|
|
|
w.mtx.Lock() |
|
|
|
|
defer w.mtx.Unlock() |
|
|
|
|
|
|
|
|
|
return w.sync() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *WAL) sync() error { |
|
|
|
|
if err := w.enc.flush(); err != nil { |
|
|
|
|
if w.cur == nil { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
if err := w.cur.Flush(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
return fileutil.Fdatasync(w.f.File) |
|
|
|
|
return fileutil.Fdatasync(w.tail()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *WAL) run(interval time.Duration) { |
|
|
|
@ -145,7 +253,7 @@ func (w *WAL) run(interval time.Duration) {
|
|
|
|
|
case <-w.stopc: |
|
|
|
|
return |
|
|
|
|
case <-tick: |
|
|
|
|
if err := w.sync(); err != nil { |
|
|
|
|
if err := w.Sync(); err != nil { |
|
|
|
|
w.logger.Log("msg", "sync failed", "err", err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -157,16 +265,18 @@ func (w *WAL) Close() error {
|
|
|
|
|
close(w.stopc) |
|
|
|
|
<-w.donec |
|
|
|
|
|
|
|
|
|
w.mtx.Lock() |
|
|
|
|
defer w.mtx.Unlock() |
|
|
|
|
|
|
|
|
|
if err := w.sync(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
return w.f.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type walEncoder struct { |
|
|
|
|
mtx sync.Mutex |
|
|
|
|
// w *ioutil.PageWriter
|
|
|
|
|
w *bufio.Writer |
|
|
|
|
// On opening, a WAL must be fully consumed once. Afterwards
|
|
|
|
|
// only the current segment will still be open.
|
|
|
|
|
if tf := w.tail(); tf != nil { |
|
|
|
|
return tf.Close() |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
@ -178,31 +288,24 @@ const (
|
|
|
|
|
walPageBytes = 16 * minSectorSize |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func newWALEncoder(f *os.File) (*walEncoder, error) { |
|
|
|
|
// offset, err := f.Seek(0, os.SEEK_CUR)
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// return nil, err
|
|
|
|
|
// }
|
|
|
|
|
enc := &walEncoder{ |
|
|
|
|
// w: ioutil.NewPageWriter(f, walPageBytes, int(offset)),
|
|
|
|
|
w: bufio.NewWriterSize(f, 4*1024*1024), |
|
|
|
|
} |
|
|
|
|
return enc, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *walEncoder) flush() error { |
|
|
|
|
e.mtx.Lock() |
|
|
|
|
defer e.mtx.Unlock() |
|
|
|
|
func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { |
|
|
|
|
w.mtx.Lock() |
|
|
|
|
defer w.mtx.Unlock() |
|
|
|
|
|
|
|
|
|
return e.w.Flush() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *walEncoder) entry(et WALEntryType, flag byte, buf []byte) error { |
|
|
|
|
e.mtx.Lock() |
|
|
|
|
defer e.mtx.Unlock() |
|
|
|
|
// Cut to the next segment if exceeds the file size unless it would also
|
|
|
|
|
// exceed the size of a new segment.
|
|
|
|
|
var ( |
|
|
|
|
sz = int64(6 + 4 + len(buf)) |
|
|
|
|
newsz = w.curN + sz |
|
|
|
|
) |
|
|
|
|
if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize { |
|
|
|
|
if err := w.cut(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
h := crc32.NewIEEE() |
|
|
|
|
w := io.MultiWriter(h, e.w) |
|
|
|
|
w.crc32.Reset() |
|
|
|
|
wr := io.MultiWriter(w.crc32, w.cur) |
|
|
|
|
|
|
|
|
|
b := make([]byte, 6) |
|
|
|
|
b[0] = byte(et) |
|
|
|
@ -210,16 +313,18 @@ func (e *walEncoder) entry(et WALEntryType, flag byte, buf []byte) error {
|
|
|
|
|
|
|
|
|
|
binary.BigEndian.PutUint32(b[2:], uint32(len(buf))) |
|
|
|
|
|
|
|
|
|
if _, err := w.Write(b); err != nil { |
|
|
|
|
if _, err := wr.Write(b); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if _, err := w.Write(buf); err != nil { |
|
|
|
|
if _, err := wr.Write(buf); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if _, err := e.w.Write(h.Sum(nil)); err != nil { |
|
|
|
|
if _, err := w.cur.Write(w.crc32.Sum(nil)); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
w.curN += sz |
|
|
|
|
|
|
|
|
|
putWALBuffer(buf) |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
@ -244,7 +349,7 @@ func putWALBuffer(b []byte) {
|
|
|
|
|
walBuffers.Put(b) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *walEncoder) encodeSeries(series []labels.Labels) error { |
|
|
|
|
func (w *WAL) encodeSeries(series []labels.Labels) error { |
|
|
|
|
if len(series) == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
@ -267,10 +372,10 @@ func (e *walEncoder) encodeSeries(series []labels.Labels) error {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return e.entry(WALEntrySeries, walSeriesSimple, buf) |
|
|
|
|
return w.entry(WALEntrySeries, walSeriesSimple, buf) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *walEncoder) encodeSamples(samples []refdSample) error { |
|
|
|
|
func (w *WAL) encodeSamples(samples []refdSample) error { |
|
|
|
|
if len(samples) == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
@ -300,25 +405,127 @@ func (e *walEncoder) encodeSamples(samples []refdSample) error {
|
|
|
|
|
buf = append(buf, b[:8]...) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return e.entry(WALEntrySamples, walSamplesSimple, buf) |
|
|
|
|
return w.entry(WALEntrySamples, walSamplesSimple, buf) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type walDecoder struct { |
|
|
|
|
r io.Reader |
|
|
|
|
handler *walHandler |
|
|
|
|
// WALReader decodes and emits write ahead log entries.
|
|
|
|
|
type WALReader struct { |
|
|
|
|
rs []io.ReadCloser |
|
|
|
|
cur int |
|
|
|
|
buf []byte |
|
|
|
|
crc32 hash.Hash32 |
|
|
|
|
|
|
|
|
|
buf []byte |
|
|
|
|
err error |
|
|
|
|
labels []labels.Labels |
|
|
|
|
samples []refdSample |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newWALDecoer(r io.Reader, h *walHandler) *walDecoder { |
|
|
|
|
return &walDecoder{ |
|
|
|
|
r: r, |
|
|
|
|
handler: h, |
|
|
|
|
buf: make([]byte, 0, 1024*1024), |
|
|
|
|
// NewWALReader returns a new WALReader over the sequence of the given ReadClosers.
|
|
|
|
|
func NewWALReader(rs ...io.ReadCloser) *WALReader { |
|
|
|
|
return &WALReader{ |
|
|
|
|
rs: rs, |
|
|
|
|
buf: make([]byte, 0, 128*4096), |
|
|
|
|
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (d *walDecoder) decodeSeries(flag byte, b []byte) error { |
|
|
|
|
// At returns the last decoded entry of labels or samples.
|
|
|
|
|
func (r *WALReader) At() ([]labels.Labels, []refdSample) { |
|
|
|
|
return r.labels, r.samples |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Err returns the last error the reader encountered.
|
|
|
|
|
func (r *WALReader) Err() error { |
|
|
|
|
return r.err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// nextEntry retrieves the next entry. It is also used as a testing hook.
|
|
|
|
|
func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { |
|
|
|
|
if r.cur >= len(r.rs) { |
|
|
|
|
return 0, 0, nil, io.EOF |
|
|
|
|
} |
|
|
|
|
cr := r.rs[r.cur] |
|
|
|
|
|
|
|
|
|
et, flag, b, err := r.entry(cr) |
|
|
|
|
if err == io.EOF { |
|
|
|
|
// Current reader completed, close and move to the next one.
|
|
|
|
|
if err := cr.Close(); err != nil { |
|
|
|
|
return 0, 0, nil, err |
|
|
|
|
} |
|
|
|
|
r.cur++ |
|
|
|
|
return r.nextEntry() |
|
|
|
|
} |
|
|
|
|
return et, flag, b, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Next returns decodes the next entry pair and returns true
|
|
|
|
|
// if it was succesful.
|
|
|
|
|
func (r *WALReader) Next() bool { |
|
|
|
|
r.labels = r.labels[:0] |
|
|
|
|
r.samples = r.samples[:0] |
|
|
|
|
|
|
|
|
|
et, flag, b, err := r.nextEntry() |
|
|
|
|
if err != nil { |
|
|
|
|
if err != io.EOF { |
|
|
|
|
r.err = err |
|
|
|
|
} |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
switch et { |
|
|
|
|
case WALEntrySamples: |
|
|
|
|
if err := r.decodeSamples(flag, b); err != nil { |
|
|
|
|
r.err = err |
|
|
|
|
} |
|
|
|
|
case WALEntrySeries: |
|
|
|
|
if err := r.decodeSeries(flag, b); err != nil { |
|
|
|
|
r.err = err |
|
|
|
|
} |
|
|
|
|
default: |
|
|
|
|
r.err = errors.Errorf("unknown WAL entry type %d", et) |
|
|
|
|
} |
|
|
|
|
return r.err == nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { |
|
|
|
|
r.crc32.Reset() |
|
|
|
|
tr := io.TeeReader(cr, r.crc32) |
|
|
|
|
|
|
|
|
|
b := make([]byte, 6) |
|
|
|
|
if _, err := tr.Read(b); err != nil { |
|
|
|
|
return 0, 0, nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
etype = WALEntryType(b[0]) |
|
|
|
|
flag = b[1] |
|
|
|
|
length = int(binary.BigEndian.Uint32(b[2:])) |
|
|
|
|
) |
|
|
|
|
// Exit if we reached pre-allocated space.
|
|
|
|
|
if etype == 0 { |
|
|
|
|
return 0, 0, nil, io.EOF |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if length > len(r.buf) { |
|
|
|
|
r.buf = make([]byte, length) |
|
|
|
|
} |
|
|
|
|
buf := r.buf[:length] |
|
|
|
|
|
|
|
|
|
if _, err := tr.Read(buf); err != nil { |
|
|
|
|
return 0, 0, nil, err |
|
|
|
|
} |
|
|
|
|
_, err := cr.Read(b[:4]) |
|
|
|
|
if err != nil { |
|
|
|
|
return 0, 0, nil, err |
|
|
|
|
} |
|
|
|
|
if exp, has := binary.BigEndian.Uint32(b[:4]), r.crc32.Sum32(); has != exp { |
|
|
|
|
return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return etype, flag, buf, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *WALReader) decodeSeries(flag byte, b []byte) error { |
|
|
|
|
for len(b) > 0 { |
|
|
|
|
l, n := binary.Uvarint(b) |
|
|
|
|
if n < 1 { |
|
|
|
@ -343,14 +550,12 @@ func (d *walDecoder) decodeSeries(flag byte, b []byte) error {
|
|
|
|
|
b = b[n+int(vl):] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := d.handler.series(lset); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
r.labels = append(r.labels, lset) |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (d *walDecoder) decodeSamples(flag byte, b []byte) error { |
|
|
|
|
func (r *WALReader) decodeSamples(flag byte, b []byte) error { |
|
|
|
|
if len(b) < 16 { |
|
|
|
|
return errors.Wrap(errInvalidSize, "header length") |
|
|
|
|
} |
|
|
|
@ -384,45 +589,7 @@ func (d *walDecoder) decodeSamples(flag byte, b []byte) error {
|
|
|
|
|
smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) |
|
|
|
|
b = b[8:] |
|
|
|
|
|
|
|
|
|
if err := d.handler.sample(smpl); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
r.samples = append(r.samples, smpl) |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (d *walDecoder) entry() error { |
|
|
|
|
b := make([]byte, 6) |
|
|
|
|
if _, err := d.r.Read(b); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
etype = WALEntryType(b[0]) |
|
|
|
|
flag = b[1] |
|
|
|
|
length = int(binary.BigEndian.Uint32(b[2:])) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if length > len(d.buf) { |
|
|
|
|
d.buf = make([]byte, length) |
|
|
|
|
} |
|
|
|
|
buf := d.buf[:length] |
|
|
|
|
|
|
|
|
|
if _, err := d.r.Read(buf); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// Read away checksum.
|
|
|
|
|
// TODO(fabxc): verify it
|
|
|
|
|
if _, err := d.r.Read(b[:4]); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
switch etype { |
|
|
|
|
case WALEntrySeries: |
|
|
|
|
return d.decodeSeries(flag, buf) |
|
|
|
|
case WALEntrySamples: |
|
|
|
|
return d.decodeSamples(flag, buf) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return errors.Errorf("unknown WAL entry type %q", etype) |
|
|
|
|
} |
|
|
|
|