diff --git a/wal.go b/wal.go index 1dadc8f2c..68c48838c 100644 --- a/wal.go +++ b/wal.go @@ -52,13 +52,16 @@ const ( WALEntryDeletes WALEntryType = 4 ) -// SamplesCB is the callback after reading samples. +// SamplesCB is the callback after reading samples. The passed slice +// is only valid until the call returns. type SamplesCB func([]RefSample) error -// SeriesCB is the callback after reading series. +// SeriesCB is the callback after reading series. The passed slice +// is only valid until the call returns. type SeriesCB func([]RefSeries) error -// DeletesCB is the callback after reading deletes. +// DeletesCB is the callback after reading deletes. The passed slice +// is only valid until the call returns. type DeletesCB func([]Stone) error // WAL is a write ahead log that can log new series labels and samples. @@ -594,6 +597,9 @@ func (w *SegmentWAL) sync() error { if err := w.flush(); err != nil { return err } + if w.head() == nil { + return nil + } return fileutil.Fdatasync(w.head().File) } @@ -769,6 +775,10 @@ type walReader struct { curBuf []byte lastOffset int64 // offset after last successfully read entry + seriesBuf []RefSeries + sampleBuf []RefSample + tombstoneBuf []Stone + err error } @@ -996,7 +1006,8 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { } func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { - series := []RefSeries{} + r.seriesBuf = r.seriesBuf[:0] + dec := decbuf{b: b} for len(dec.b) > 0 && dec.err() == nil { @@ -1010,7 +1021,7 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { } sort.Sort(lset) - series = append(series, RefSeries{ + r.seriesBuf = append(r.seriesBuf, RefSeries{ Ref: ref, Labels: lset, }) @@ -1019,16 +1030,16 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { return nil, dec.err() } if len(dec.b) > 0 { - return series, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return r.seriesBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return series, nil + return r.seriesBuf, nil } func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { if len(b) == 0 { return nil, nil } - samples := []RefSample{} + r.sampleBuf = r.sampleBuf[:0] dec := decbuf{b: b} var ( @@ -1041,7 +1052,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { dtime := dec.varint64() val := dec.be64() - samples = append(samples, RefSample{ + r.sampleBuf = append(r.sampleBuf, RefSample{ Ref: uint64(int64(baseRef) + dref), T: baseTime + dtime, V: math.Float64frombits(val), @@ -1049,20 +1060,20 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { } if dec.err() != nil { - return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples)) + return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(r.sampleBuf)) } if len(dec.b) > 0 { - return samples, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return r.sampleBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return samples, nil + return r.sampleBuf, nil } func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { dec := &decbuf{b: b} - var stones []Stone + r.tombstoneBuf = r.tombstoneBuf[:0] for dec.len() > 0 && dec.err() == nil { - stones = append(stones, Stone{ + r.tombstoneBuf = append(r.tombstoneBuf, Stone{ ref: dec.be64(), intervals: Intervals{ {Mint: dec.varint64(), Maxt: dec.varint64()}, @@ -1073,7 +1084,7 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { return nil, dec.err() } if len(dec.b) > 0 { - return stones, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return r.tombstoneBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return stones, nil + return r.tombstoneBuf, nil } diff --git a/wal_test.go b/wal_test.go index 17a71da75..45279b1b5 100644 --- a/wal_test.go +++ b/wal_test.go @@ -197,7 +197,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { const ( numMetrics = 50 iterations = 5 - stepSize = 1 + stepSize = 5 ) // Generate testing data. It does not make semantical sense but // for the purpose of this test. @@ -236,7 +236,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { if len(series) > 0 { clsets := make([]RefSeries, len(series)) copy(clsets, series) - resultSeries = append(resultSeries, series) + resultSeries = append(resultSeries, clsets) } return nil @@ -253,7 +253,9 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { delf := func(stones []Stone) error { if len(stones) > 0 { - resultDeletes = append(resultDeletes, stones) + cst := make([]Stone, len(stones)) + copy(cst, stones) + resultDeletes = append(resultDeletes, cst) } return nil