From d3682d701c66a3ccb3d193d1f66490bb86044b46 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 6 Oct 2017 14:06:39 +0200 Subject: [PATCH] wal: decode and process in separate threads. --- head.go | 9 +- head_test.go | 2 +- wal.go | 243 +++++++++++++++++++++++++++++---------------------- wal_test.go | 27 ++---- 4 files changed, 150 insertions(+), 131 deletions(-) diff --git a/head.go b/head.go index 4f3c60c39..1347b75d9 100644 --- a/head.go +++ b/head.go @@ -197,7 +197,7 @@ func (h *Head) ReadWAL() error { // for error reporting. var unknownRefs int - seriesFunc := func(series []RefSeries) error { + seriesFunc := func(series []RefSeries) { for _, s := range series { h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) @@ -205,9 +205,8 @@ func (h *Head) ReadWAL() error { h.lastSeriesID = s.Ref } } - return nil } - samplesFunc := func(samples []RefSample) error { + samplesFunc := func(samples []RefSample) { for _, s := range samples { if s.T < mint { continue @@ -223,9 +222,8 @@ func (h *Head) ReadWAL() error { h.metrics.chunks.Inc() } } - return nil } - deletesFunc := func(stones []Stone) error { + deletesFunc := func(stones []Stone) { for _, s := range stones { for _, itv := range s.intervals { if itv.Maxt < mint { @@ -234,7 +232,6 @@ func (h *Head) ReadWAL() error { h.tombstones.add(s.ref, itv) } } - return nil } if unknownRefs > 0 { diff --git a/head_test.go b/head_test.go index 31713b9c1..308ae3e2b 100644 --- a/head_test.go +++ b/head_test.go @@ -93,7 +93,7 @@ func (w *memoryWAL) Reader() WALReader { return w } -func (w *memoryWAL) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error { +func (w *memoryWAL) Read(series func([]RefSeries), samples func([]RefSample), deletes func([]Stone)) error { for _, e := range w.entries { switch v := e.(type) { case []RefSeries: diff --git a/wal.go b/wal.go index 467c4e09b..9dc45c608 100644 --- a/wal.go +++ b/wal.go @@ -27,16 +27,16 @@ import ( "sync" "time" - "github.com/prometheus/tsdb/fileutil" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - "github.com/prometheus/tsdb/labels" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/labels" ) // WALEntryType indicates what data a WAL entry contains. -type WALEntryType byte +type WALEntryType uint8 const ( // WALMagic is a 4 byte number every WAL segment file starts with. @@ -54,18 +54,6 @@ const ( WALEntryDeletes WALEntryType = 4 ) -// SamplesCB is the callback after reading samples. The passed slice -// is only valid until the call returns. -type SamplesCB func([]RefSample) error - -// SeriesCB is the callback after reading series. The passed slice -// is only valid until the call returns. -type SeriesCB func([]RefSeries) error - -// DeletesCB is the callback after reading deletes. The passed slice -// is only valid until the call returns. -type DeletesCB func([]Stone) error - type walMetrics struct { fsyncDuration prometheus.Summary } @@ -104,17 +92,27 @@ func NopWAL() WAL { type nopWAL struct{} -func (nopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil } -func (w nopWAL) Reader() WALReader { return w } -func (nopWAL) LogSeries([]RefSeries) error { return nil } -func (nopWAL) LogSamples([]RefSample) error { return nil } -func (nopWAL) LogDeletes([]Stone) error { return nil } -func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil } -func (nopWAL) Close() error { return nil } +func (nopWAL) Read( + seriesf func([]RefSeries), + samplesf func([]RefSample), + deletesf func([]Stone), +) error { + return nil +} +func (w nopWAL) Reader() WALReader { return w } +func (nopWAL) LogSeries([]RefSeries) error { return nil } +func (nopWAL) LogSamples([]RefSample) error { return nil } +func (nopWAL) LogDeletes([]Stone) error { return nil } +func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil } +func (nopWAL) Close() error { return nil } // WALReader reads entries from a WAL. type WALReader interface { - Read(SeriesCB, SamplesCB, DeletesCB) error + Read( + seriesf func([]RefSeries), + samplesf func([]RefSample), + deletesf func([]Stone), + ) error } // RefSeries is the series labels with the series ID. @@ -170,7 +168,7 @@ func newCRC32() hash.Hash32 { // SegmentWAL is a write ahead log for series data. type SegmentWAL struct { - mtx sync.Mutex + mtx sync.Mutex metrics *walMetrics dirFile *os.File @@ -238,12 +236,16 @@ type repairingWALReader struct { r WALReader } -func (r *repairingWALReader) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error { - err := r.r.Read(series, samples, deletes) +func (r *repairingWALReader) Read( + seriesf func([]RefSeries), + samplesf func([]RefSample), + deletesf func([]Stone), +) error { + err := r.r.Read(seriesf, samplesf, deletesf) if err == nil { return nil } - cerr, ok := err.(walCorruptionErr) + cerr, ok := errors.Cause(err).(walCorruptionErr) if !ok { return err } @@ -336,6 +338,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { var ( csf = newSegmentFile(f) crc32 = newCRC32() + decSeries = []RefSeries{} activeSeries = []RefSeries{} ) @@ -345,13 +348,14 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { if rt != WALEntrySeries { continue } - series, err := r.decodeSeries(flag, byt) + decSeries = decSeries[:0] + activeSeries = activeSeries[:0] + + err := r.decodeSeries(flag, byt, &decSeries) if err != nil { return errors.Wrap(err, "decode samples while truncating") } - activeSeries = activeSeries[:0] - - for _, s := range series { + for _, s := range decSeries { if keep(s.Ref) { activeSeries = append(activeSeries, s) } @@ -807,10 +811,6 @@ type walReader struct { curBuf []byte lastOffset int64 // offset after last successfully read entry - seriesBuf []RefSeries - sampleBuf []RefSample - tombstoneBuf []Stone - err error } @@ -831,70 +831,118 @@ func (r *walReader) Err() error { return r.err } -func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error { - if seriesf == nil { - seriesf = func([]RefSeries) error { return nil } - } - if samplesf == nil { - samplesf = func([]RefSample) error { return nil } - } - if deletesf == nil { - deletesf = func([]Stone) error { return nil } - } +func (r *walReader) Read( + seriesf func([]RefSeries), + samplesf func([]RefSample), + deletesf func([]Stone), +) error { + // Concurrency for replaying the WAL is very limited. We at least split out decoding and + // processing into separate threads. + // Historically, the processing is the bottleneck with reading and decoding using only + // 15% of the CPU. + var ( + seriesPool sync.Pool + samplePool sync.Pool + deletePool sync.Pool + ) + donec := make(chan struct{}) + datac := make(chan interface{}, 50) + + go func() { + defer close(donec) + + for x := range datac { + switch v := x.(type) { + case []RefSeries: + if seriesf != nil { + seriesf(v) + } + seriesPool.Put(v[:0]) + case []RefSample: + if samplesf != nil { + samplesf(v) + } + samplePool.Put(v[:0]) + case []Stone: + if deletesf != nil { + deletesf(v) + } + deletePool.Put(v[:0]) + default: + level.Error(r.logger).Log("msg", "unexpected data type") + } + } + }() + + var err error for r.next() { et, flag, b := r.at() + // In decoding below we never return a walCorruptionErr for now. // Those should generally be catched by entry decoding before. switch et { case WALEntrySeries: - series, err := r.decodeSeries(flag, b) - if err != nil { - return errors.Wrap(err, "decode series entry") + var series []RefSeries + if v := seriesPool.Get(); v == nil { + series = make([]RefSeries, 0, 512) + } else { + series = v.([]RefSeries) } - if err := seriesf(series); err != nil { - return err + + err := r.decodeSeries(flag, b, &series) + if err != nil { + err = errors.Wrap(err, "decode series entry") + break } + datac <- series cf := r.current() - for _, s := range series { if cf.minSeries > s.Ref { cf.minSeries = s.Ref } } - case WALEntrySamples: - samples, err := r.decodeSamples(flag, b) - if err != nil { - return errors.Wrap(err, "decode samples entry") + var samples []RefSample + if v := samplePool.Get(); v == nil { + samples = make([]RefSample, 0, 512) + } else { + samples = v.([]RefSample) } - if err := samplesf(samples); err != nil { - return err + + err := r.decodeSamples(flag, b, &samples) + if err != nil { + err = errors.Wrap(err, "decode samples entry") + break } + datac <- samples // Update the times for the WAL segment file. cf := r.current() - for _, s := range samples { if cf.maxTime < s.T { cf.maxTime = s.T } } - case WALEntryDeletes: - stones, err := r.decodeDeletes(flag, b) - if err != nil { - return errors.Wrap(err, "decode delete entry") + var deletes []Stone + if v := deletePool.Get(); v == nil { + deletes = make([]Stone, 0, 512) + } else { + deletes = v.([]Stone) } - if err := deletesf(stones); err != nil { - return err + + err := r.decodeDeletes(flag, b, &deletes) + if err != nil { + err = errors.Wrap(err, "decode delete entry") + break } - // Update the times for the WAL segment file. + datac <- deletes + // Update the times for the WAL segment file. cf := r.current() - - for _, s := range stones { + for _, s := range deletes { for _, iv := range s.intervals { if cf.maxTime < iv.Maxt { cf.maxTime = iv.Maxt @@ -903,27 +951,16 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC } } } + close(datac) + <-donec - return r.Err() -} - -// nextEntry retrieves the next entry. It is also used as a testing hook. -func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { - if r.cur >= len(r.files) { - return 0, 0, nil, io.EOF + if err != nil { + return err } - cf := r.current() - - et, flag, b, err := r.entry(cf) - // If we reached the end of the reader, advance to the next one and close. - // Do not close on the last one as it will still be appended to. - if err == io.EOF && r.cur < len(r.files)-1 { - // Current reader completed. Leave the file open for later reads - // for truncating. - r.cur++ - return r.nextEntry() + if r.Err() != nil { + return errors.Wrap(r.Err(), "read entry") } - return et, flag, b, err + return nil } func (r *walReader) at() (WALEntryType, byte, []byte) { @@ -1043,9 +1080,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { return etype, flag, buf, nil } -func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { - r.seriesBuf = r.seriesBuf[:0] - +func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error { dec := decbuf{b: b} for len(dec.b) > 0 && dec.err() == nil { @@ -1059,25 +1094,24 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { } sort.Sort(lset) - r.seriesBuf = append(r.seriesBuf, RefSeries{ + *res = append(*res, RefSeries{ Ref: ref, Labels: lset, }) } if dec.err() != nil { - return nil, dec.err() + return dec.err() } if len(dec.b) > 0 { - return r.seriesBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return r.seriesBuf, nil + return nil } -func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { +func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error { if len(b) == 0 { - return nil, nil + return nil } - r.sampleBuf = r.sampleBuf[:0] dec := decbuf{b: b} var ( @@ -1090,7 +1124,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { dtime := dec.varint64() val := dec.be64() - r.sampleBuf = append(r.sampleBuf, RefSample{ + *res = append(*res, RefSample{ Ref: uint64(int64(baseRef) + dref), T: baseTime + dtime, V: math.Float64frombits(val), @@ -1098,20 +1132,19 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { } if dec.err() != nil { - return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(r.sampleBuf)) + return errors.Wrapf(dec.err(), "decode error after %d samples", len(*res)) } if len(dec.b) > 0 { - return r.sampleBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return r.sampleBuf, nil + return nil } -func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { +func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { dec := &decbuf{b: b} - r.tombstoneBuf = r.tombstoneBuf[:0] for dec.len() > 0 && dec.err() == nil { - r.tombstoneBuf = append(r.tombstoneBuf, Stone{ + *res = append(*res, Stone{ ref: dec.be64(), intervals: Intervals{ {Mint: dec.varint64(), Maxt: dec.varint64()}, @@ -1119,10 +1152,10 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { }) } if dec.err() != nil { - return nil, dec.err() + return dec.err() } if len(dec.b) > 0 { - return r.tombstoneBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) } - return r.tombstoneBuf, nil + return nil } diff --git a/wal_test.go b/wal_test.go index d469a888c..aadce89d7 100644 --- a/wal_test.go +++ b/wal_test.go @@ -187,9 +187,8 @@ func TestSegmentWAL_Truncate(t *testing.T) { var readSeries []RefSeries r := w.Reader() - r.Read(func(s []RefSeries) error { + r.Read(func(s []RefSeries) { readSeries = append(readSeries, s...) - return nil }, nil, nil) require.Equal(t, expected, readSeries) @@ -235,33 +234,27 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { resultDeletes [][]Stone ) - serf := func(series []RefSeries) error { + serf := func(series []RefSeries) { if len(series) > 0 { clsets := make([]RefSeries, len(series)) copy(clsets, series) resultSeries = append(resultSeries, clsets) } - - return nil } - smplf := func(smpls []RefSample) error { + smplf := func(smpls []RefSample) { if len(smpls) > 0 { csmpls := make([]RefSample, len(smpls)) copy(csmpls, smpls) resultSamples = append(resultSamples, csmpls) } - - return nil } - delf := func(stones []Stone) error { + delf := func(stones []Stone) { if len(stones) > 0 { cst := make([]Stone, len(stones)) copy(cst, stones) resultDeletes = append(resultDeletes, cst) } - - return nil } require.NoError(t, r.Read(serf, smplf, delf)) @@ -420,26 +413,22 @@ func TestWALRestoreCorrupted(t *testing.T) { r := w2.Reader() - serf := func(l []RefSeries) error { + serf := func(l []RefSeries) { require.Equal(t, 0, len(l)) - return nil } - delf := func([]Stone) error { return nil } // Weird hack to check order of reads. i := 0 - samplf := func(s []RefSample) error { + samplf := func(s []RefSample) { if i == 0 { require.Equal(t, []RefSample{{T: 1, V: 2}}, s) i++ } else { require.Equal(t, []RefSample{{T: 99, V: 100}}, s) } - - return nil } - require.NoError(t, r.Read(serf, samplf, delf)) + require.NoError(t, r.Read(serf, samplf, nil)) require.NoError(t, w2.LogSamples([]RefSample{{T: 99, V: 100}})) require.NoError(t, w2.Close()) @@ -452,7 +441,7 @@ func TestWALRestoreCorrupted(t *testing.T) { r = w3.Reader() i = 0 - require.NoError(t, r.Read(serf, samplf, delf)) + require.NoError(t, r.Read(serf, samplf, nil)) }) } }