Re-use slices on deocding the WAL. Fix tests.

pull/5805/head
Fabian Reinartz 7 years ago
parent 0db4c227b7
commit c2916736be

@ -52,13 +52,16 @@ const (
WALEntryDeletes WALEntryType = 4 WALEntryDeletes WALEntryType = 4
) )
// SamplesCB is the callback after reading samples. // SamplesCB is the callback after reading samples. The passed slice
// is only valid until the call returns.
type SamplesCB func([]RefSample) error type SamplesCB func([]RefSample) error
// SeriesCB is the callback after reading series. // SeriesCB is the callback after reading series. The passed slice
// is only valid until the call returns.
type SeriesCB func([]RefSeries) error type SeriesCB func([]RefSeries) error
// DeletesCB is the callback after reading deletes. // DeletesCB is the callback after reading deletes. The passed slice
// is only valid until the call returns.
type DeletesCB func([]Stone) error type DeletesCB func([]Stone) error
// WAL is a write ahead log that can log new series labels and samples. // WAL is a write ahead log that can log new series labels and samples.
@ -594,6 +597,9 @@ func (w *SegmentWAL) sync() error {
if err := w.flush(); err != nil { if err := w.flush(); err != nil {
return err return err
} }
if w.head() == nil {
return nil
}
return fileutil.Fdatasync(w.head().File) return fileutil.Fdatasync(w.head().File)
} }
@ -769,6 +775,10 @@ type walReader struct {
curBuf []byte curBuf []byte
lastOffset int64 // offset after last successfully read entry lastOffset int64 // offset after last successfully read entry
seriesBuf []RefSeries
sampleBuf []RefSample
tombstoneBuf []Stone
err error err error
} }
@ -996,7 +1006,8 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
} }
func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
series := []RefSeries{} r.seriesBuf = r.seriesBuf[:0]
dec := decbuf{b: b} dec := decbuf{b: b}
for len(dec.b) > 0 && dec.err() == nil { for len(dec.b) > 0 && dec.err() == nil {
@ -1010,7 +1021,7 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
} }
sort.Sort(lset) sort.Sort(lset)
series = append(series, RefSeries{ r.seriesBuf = append(r.seriesBuf, RefSeries{
Ref: ref, Ref: ref,
Labels: lset, Labels: lset,
}) })
@ -1019,16 +1030,16 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
return nil, dec.err() return nil, dec.err()
} }
if len(dec.b) > 0 { if len(dec.b) > 0 {
return series, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) return r.seriesBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
} }
return series, nil return r.seriesBuf, nil
} }
func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
if len(b) == 0 { if len(b) == 0 {
return nil, nil return nil, nil
} }
samples := []RefSample{} r.sampleBuf = r.sampleBuf[:0]
dec := decbuf{b: b} dec := decbuf{b: b}
var ( var (
@ -1041,7 +1052,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
dtime := dec.varint64() dtime := dec.varint64()
val := dec.be64() val := dec.be64()
samples = append(samples, RefSample{ r.sampleBuf = append(r.sampleBuf, RefSample{
Ref: uint64(int64(baseRef) + dref), Ref: uint64(int64(baseRef) + dref),
T: baseTime + dtime, T: baseTime + dtime,
V: math.Float64frombits(val), V: math.Float64frombits(val),
@ -1049,20 +1060,20 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
} }
if dec.err() != nil { if dec.err() != nil {
return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples)) return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(r.sampleBuf))
} }
if len(dec.b) > 0 { if len(dec.b) > 0 {
return samples, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) return r.sampleBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
} }
return samples, nil return r.sampleBuf, nil
} }
func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
dec := &decbuf{b: b} dec := &decbuf{b: b}
var stones []Stone r.tombstoneBuf = r.tombstoneBuf[:0]
for dec.len() > 0 && dec.err() == nil { for dec.len() > 0 && dec.err() == nil {
stones = append(stones, Stone{ r.tombstoneBuf = append(r.tombstoneBuf, Stone{
ref: dec.be64(), ref: dec.be64(),
intervals: Intervals{ intervals: Intervals{
{Mint: dec.varint64(), Maxt: dec.varint64()}, {Mint: dec.varint64(), Maxt: dec.varint64()},
@ -1073,7 +1084,7 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
return nil, dec.err() return nil, dec.err()
} }
if len(dec.b) > 0 { if len(dec.b) > 0 {
return stones, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) return r.tombstoneBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
} }
return stones, nil return r.tombstoneBuf, nil
} }

@ -197,7 +197,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
const ( const (
numMetrics = 50 numMetrics = 50
iterations = 5 iterations = 5
stepSize = 1 stepSize = 5
) )
// Generate testing data. It does not make semantical sense but // Generate testing data. It does not make semantical sense but
// for the purpose of this test. // for the purpose of this test.
@ -236,7 +236,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
if len(series) > 0 { if len(series) > 0 {
clsets := make([]RefSeries, len(series)) clsets := make([]RefSeries, len(series))
copy(clsets, series) copy(clsets, series)
resultSeries = append(resultSeries, series) resultSeries = append(resultSeries, clsets)
} }
return nil return nil
@ -253,7 +253,9 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
delf := func(stones []Stone) error { delf := func(stones []Stone) error {
if len(stones) > 0 { if len(stones) > 0 {
resultDeletes = append(resultDeletes, stones) cst := make([]Stone, len(stones))
copy(cst, stones)
resultDeletes = append(resultDeletes, cst)
} }
return nil return nil

Loading…
Cancel
Save