|
|
|
@ -27,16 +27,16 @@ import (
|
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/prometheus/tsdb/fileutil" |
|
|
|
|
"github.com/go-kit/kit/log" |
|
|
|
|
"github.com/go-kit/kit/log/level" |
|
|
|
|
"github.com/pkg/errors" |
|
|
|
|
"github.com/prometheus/tsdb/labels" |
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
|
"github.com/prometheus/tsdb/fileutil" |
|
|
|
|
"github.com/prometheus/tsdb/labels" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// WALEntryType indicates what data a WAL entry contains.
|
|
|
|
|
type WALEntryType byte |
|
|
|
|
type WALEntryType uint8 |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
// WALMagic is a 4 byte number every WAL segment file starts with.
|
|
|
|
@ -54,18 +54,6 @@ const (
|
|
|
|
|
WALEntryDeletes WALEntryType = 4 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// SamplesCB is the callback after reading samples. The passed slice
|
|
|
|
|
// is only valid until the call returns.
|
|
|
|
|
type SamplesCB func([]RefSample) error |
|
|
|
|
|
|
|
|
|
// SeriesCB is the callback after reading series. The passed slice
|
|
|
|
|
// is only valid until the call returns.
|
|
|
|
|
type SeriesCB func([]RefSeries) error |
|
|
|
|
|
|
|
|
|
// DeletesCB is the callback after reading deletes. The passed slice
|
|
|
|
|
// is only valid until the call returns.
|
|
|
|
|
type DeletesCB func([]Stone) error |
|
|
|
|
|
|
|
|
|
type walMetrics struct { |
|
|
|
|
fsyncDuration prometheus.Summary |
|
|
|
|
} |
|
|
|
@ -104,17 +92,27 @@ func NopWAL() WAL {
|
|
|
|
|
|
|
|
|
|
type nopWAL struct{} |
|
|
|
|
|
|
|
|
|
func (nopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil } |
|
|
|
|
func (w nopWAL) Reader() WALReader { return w } |
|
|
|
|
func (nopWAL) LogSeries([]RefSeries) error { return nil } |
|
|
|
|
func (nopWAL) LogSamples([]RefSample) error { return nil } |
|
|
|
|
func (nopWAL) LogDeletes([]Stone) error { return nil } |
|
|
|
|
func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil } |
|
|
|
|
func (nopWAL) Close() error { return nil } |
|
|
|
|
func (nopWAL) Read( |
|
|
|
|
seriesf func([]RefSeries), |
|
|
|
|
samplesf func([]RefSample), |
|
|
|
|
deletesf func([]Stone), |
|
|
|
|
) error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
func (w nopWAL) Reader() WALReader { return w } |
|
|
|
|
func (nopWAL) LogSeries([]RefSeries) error { return nil } |
|
|
|
|
func (nopWAL) LogSamples([]RefSample) error { return nil } |
|
|
|
|
func (nopWAL) LogDeletes([]Stone) error { return nil } |
|
|
|
|
func (nopWAL) Truncate(int64, func(uint64) bool) error { return nil } |
|
|
|
|
func (nopWAL) Close() error { return nil } |
|
|
|
|
|
|
|
|
|
// WALReader reads entries from a WAL.
|
|
|
|
|
type WALReader interface { |
|
|
|
|
Read(SeriesCB, SamplesCB, DeletesCB) error |
|
|
|
|
Read( |
|
|
|
|
seriesf func([]RefSeries), |
|
|
|
|
samplesf func([]RefSample), |
|
|
|
|
deletesf func([]Stone), |
|
|
|
|
) error |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// RefSeries is the series labels with the series ID.
|
|
|
|
@ -170,7 +168,7 @@ func newCRC32() hash.Hash32 {
|
|
|
|
|
|
|
|
|
|
// SegmentWAL is a write ahead log for series data.
|
|
|
|
|
type SegmentWAL struct { |
|
|
|
|
mtx sync.Mutex |
|
|
|
|
mtx sync.Mutex |
|
|
|
|
metrics *walMetrics |
|
|
|
|
|
|
|
|
|
dirFile *os.File |
|
|
|
@ -238,12 +236,16 @@ type repairingWALReader struct {
|
|
|
|
|
r WALReader |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *repairingWALReader) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error { |
|
|
|
|
err := r.r.Read(series, samples, deletes) |
|
|
|
|
func (r *repairingWALReader) Read( |
|
|
|
|
seriesf func([]RefSeries), |
|
|
|
|
samplesf func([]RefSample), |
|
|
|
|
deletesf func([]Stone), |
|
|
|
|
) error { |
|
|
|
|
err := r.r.Read(seriesf, samplesf, deletesf) |
|
|
|
|
if err == nil { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
cerr, ok := err.(walCorruptionErr) |
|
|
|
|
cerr, ok := errors.Cause(err).(walCorruptionErr) |
|
|
|
|
if !ok { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -336,6 +338,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
|
|
|
|
|
var ( |
|
|
|
|
csf = newSegmentFile(f) |
|
|
|
|
crc32 = newCRC32() |
|
|
|
|
decSeries = []RefSeries{} |
|
|
|
|
activeSeries = []RefSeries{} |
|
|
|
|
) |
|
|
|
|
|
|
|
|
@ -345,13 +348,14 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
|
|
|
|
|
if rt != WALEntrySeries { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
series, err := r.decodeSeries(flag, byt) |
|
|
|
|
decSeries = decSeries[:0] |
|
|
|
|
activeSeries = activeSeries[:0] |
|
|
|
|
|
|
|
|
|
err := r.decodeSeries(flag, byt, &decSeries) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "decode samples while truncating") |
|
|
|
|
} |
|
|
|
|
activeSeries = activeSeries[:0] |
|
|
|
|
|
|
|
|
|
for _, s := range series { |
|
|
|
|
for _, s := range decSeries { |
|
|
|
|
if keep(s.Ref) { |
|
|
|
|
activeSeries = append(activeSeries, s) |
|
|
|
|
} |
|
|
|
@ -807,10 +811,6 @@ type walReader struct {
|
|
|
|
|
curBuf []byte |
|
|
|
|
lastOffset int64 // offset after last successfully read entry
|
|
|
|
|
|
|
|
|
|
seriesBuf []RefSeries |
|
|
|
|
sampleBuf []RefSample |
|
|
|
|
tombstoneBuf []Stone |
|
|
|
|
|
|
|
|
|
err error |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -831,70 +831,118 @@ func (r *walReader) Err() error {
|
|
|
|
|
return r.err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error { |
|
|
|
|
if seriesf == nil { |
|
|
|
|
seriesf = func([]RefSeries) error { return nil } |
|
|
|
|
} |
|
|
|
|
if samplesf == nil { |
|
|
|
|
samplesf = func([]RefSample) error { return nil } |
|
|
|
|
} |
|
|
|
|
if deletesf == nil { |
|
|
|
|
deletesf = func([]Stone) error { return nil } |
|
|
|
|
} |
|
|
|
|
func (r *walReader) Read( |
|
|
|
|
seriesf func([]RefSeries), |
|
|
|
|
samplesf func([]RefSample), |
|
|
|
|
deletesf func([]Stone), |
|
|
|
|
) error { |
|
|
|
|
// Concurrency for replaying the WAL is very limited. We at least split out decoding and
|
|
|
|
|
// processing into separate threads.
|
|
|
|
|
// Historically, the processing is the bottleneck with reading and decoding using only
|
|
|
|
|
// 15% of the CPU.
|
|
|
|
|
var ( |
|
|
|
|
seriesPool sync.Pool |
|
|
|
|
samplePool sync.Pool |
|
|
|
|
deletePool sync.Pool |
|
|
|
|
) |
|
|
|
|
donec := make(chan struct{}) |
|
|
|
|
datac := make(chan interface{}, 50) |
|
|
|
|
|
|
|
|
|
go func() { |
|
|
|
|
defer close(donec) |
|
|
|
|
|
|
|
|
|
for x := range datac { |
|
|
|
|
switch v := x.(type) { |
|
|
|
|
case []RefSeries: |
|
|
|
|
if seriesf != nil { |
|
|
|
|
seriesf(v) |
|
|
|
|
} |
|
|
|
|
seriesPool.Put(v[:0]) |
|
|
|
|
case []RefSample: |
|
|
|
|
if samplesf != nil { |
|
|
|
|
samplesf(v) |
|
|
|
|
} |
|
|
|
|
samplePool.Put(v[:0]) |
|
|
|
|
case []Stone: |
|
|
|
|
if deletesf != nil { |
|
|
|
|
deletesf(v) |
|
|
|
|
} |
|
|
|
|
deletePool.Put(v[:0]) |
|
|
|
|
default: |
|
|
|
|
level.Error(r.logger).Log("msg", "unexpected data type") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
var err error |
|
|
|
|
|
|
|
|
|
for r.next() { |
|
|
|
|
et, flag, b := r.at() |
|
|
|
|
|
|
|
|
|
// In decoding below we never return a walCorruptionErr for now.
|
|
|
|
|
// Those should generally be catched by entry decoding before.
|
|
|
|
|
switch et { |
|
|
|
|
case WALEntrySeries: |
|
|
|
|
series, err := r.decodeSeries(flag, b) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "decode series entry") |
|
|
|
|
var series []RefSeries |
|
|
|
|
if v := seriesPool.Get(); v == nil { |
|
|
|
|
series = make([]RefSeries, 0, 512) |
|
|
|
|
} else { |
|
|
|
|
series = v.([]RefSeries) |
|
|
|
|
} |
|
|
|
|
if err := seriesf(series); err != nil { |
|
|
|
|
return err |
|
|
|
|
|
|
|
|
|
err := r.decodeSeries(flag, b, &series) |
|
|
|
|
if err != nil { |
|
|
|
|
err = errors.Wrap(err, "decode series entry") |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
datac <- series |
|
|
|
|
|
|
|
|
|
cf := r.current() |
|
|
|
|
|
|
|
|
|
for _, s := range series { |
|
|
|
|
if cf.minSeries > s.Ref { |
|
|
|
|
cf.minSeries = s.Ref |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case WALEntrySamples: |
|
|
|
|
samples, err := r.decodeSamples(flag, b) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "decode samples entry") |
|
|
|
|
var samples []RefSample |
|
|
|
|
if v := samplePool.Get(); v == nil { |
|
|
|
|
samples = make([]RefSample, 0, 512) |
|
|
|
|
} else { |
|
|
|
|
samples = v.([]RefSample) |
|
|
|
|
} |
|
|
|
|
if err := samplesf(samples); err != nil { |
|
|
|
|
return err |
|
|
|
|
|
|
|
|
|
err := r.decodeSamples(flag, b, &samples) |
|
|
|
|
if err != nil { |
|
|
|
|
err = errors.Wrap(err, "decode samples entry") |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
datac <- samples |
|
|
|
|
|
|
|
|
|
// Update the times for the WAL segment file.
|
|
|
|
|
cf := r.current() |
|
|
|
|
|
|
|
|
|
for _, s := range samples { |
|
|
|
|
if cf.maxTime < s.T { |
|
|
|
|
cf.maxTime = s.T |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case WALEntryDeletes: |
|
|
|
|
stones, err := r.decodeDeletes(flag, b) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "decode delete entry") |
|
|
|
|
var deletes []Stone |
|
|
|
|
if v := deletePool.Get(); v == nil { |
|
|
|
|
deletes = make([]Stone, 0, 512) |
|
|
|
|
} else { |
|
|
|
|
deletes = v.([]Stone) |
|
|
|
|
} |
|
|
|
|
if err := deletesf(stones); err != nil { |
|
|
|
|
return err |
|
|
|
|
|
|
|
|
|
err := r.decodeDeletes(flag, b, &deletes) |
|
|
|
|
if err != nil { |
|
|
|
|
err = errors.Wrap(err, "decode delete entry") |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
// Update the times for the WAL segment file.
|
|
|
|
|
datac <- deletes |
|
|
|
|
|
|
|
|
|
// Update the times for the WAL segment file.
|
|
|
|
|
cf := r.current() |
|
|
|
|
|
|
|
|
|
for _, s := range stones { |
|
|
|
|
for _, s := range deletes { |
|
|
|
|
for _, iv := range s.intervals { |
|
|
|
|
if cf.maxTime < iv.Maxt { |
|
|
|
|
cf.maxTime = iv.Maxt |
|
|
|
@ -903,27 +951,16 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
close(datac) |
|
|
|
|
<-donec |
|
|
|
|
|
|
|
|
|
return r.Err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// nextEntry retrieves the next entry. It is also used as a testing hook.
|
|
|
|
|
func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { |
|
|
|
|
if r.cur >= len(r.files) { |
|
|
|
|
return 0, 0, nil, io.EOF |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
cf := r.current() |
|
|
|
|
|
|
|
|
|
et, flag, b, err := r.entry(cf) |
|
|
|
|
// If we reached the end of the reader, advance to the next one and close.
|
|
|
|
|
// Do not close on the last one as it will still be appended to.
|
|
|
|
|
if err == io.EOF && r.cur < len(r.files)-1 { |
|
|
|
|
// Current reader completed. Leave the file open for later reads
|
|
|
|
|
// for truncating.
|
|
|
|
|
r.cur++ |
|
|
|
|
return r.nextEntry() |
|
|
|
|
if r.Err() != nil { |
|
|
|
|
return errors.Wrap(r.Err(), "read entry") |
|
|
|
|
} |
|
|
|
|
return et, flag, b, err |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *walReader) at() (WALEntryType, byte, []byte) { |
|
|
|
@ -1043,9 +1080,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
|
|
|
|
|
return etype, flag, buf, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { |
|
|
|
|
r.seriesBuf = r.seriesBuf[:0] |
|
|
|
|
|
|
|
|
|
func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error { |
|
|
|
|
dec := decbuf{b: b} |
|
|
|
|
|
|
|
|
|
for len(dec.b) > 0 && dec.err() == nil { |
|
|
|
@ -1059,25 +1094,24 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
|
|
|
|
|
} |
|
|
|
|
sort.Sort(lset) |
|
|
|
|
|
|
|
|
|
r.seriesBuf = append(r.seriesBuf, RefSeries{ |
|
|
|
|
*res = append(*res, RefSeries{ |
|
|
|
|
Ref: ref, |
|
|
|
|
Labels: lset, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
if dec.err() != nil { |
|
|
|
|
return nil, dec.err() |
|
|
|
|
return dec.err() |
|
|
|
|
} |
|
|
|
|
if len(dec.b) > 0 { |
|
|
|
|
return r.seriesBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) |
|
|
|
|
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) |
|
|
|
|
} |
|
|
|
|
return r.seriesBuf, nil |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { |
|
|
|
|
func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error { |
|
|
|
|
if len(b) == 0 { |
|
|
|
|
return nil, nil |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
r.sampleBuf = r.sampleBuf[:0] |
|
|
|
|
dec := decbuf{b: b} |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
@ -1090,7 +1124,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
|
|
|
|
|
dtime := dec.varint64() |
|
|
|
|
val := dec.be64() |
|
|
|
|
|
|
|
|
|
r.sampleBuf = append(r.sampleBuf, RefSample{ |
|
|
|
|
*res = append(*res, RefSample{ |
|
|
|
|
Ref: uint64(int64(baseRef) + dref), |
|
|
|
|
T: baseTime + dtime, |
|
|
|
|
V: math.Float64frombits(val), |
|
|
|
@ -1098,20 +1132,19 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if dec.err() != nil { |
|
|
|
|
return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(r.sampleBuf)) |
|
|
|
|
return errors.Wrapf(dec.err(), "decode error after %d samples", len(*res)) |
|
|
|
|
} |
|
|
|
|
if len(dec.b) > 0 { |
|
|
|
|
return r.sampleBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) |
|
|
|
|
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) |
|
|
|
|
} |
|
|
|
|
return r.sampleBuf, nil |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { |
|
|
|
|
func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { |
|
|
|
|
dec := &decbuf{b: b} |
|
|
|
|
r.tombstoneBuf = r.tombstoneBuf[:0] |
|
|
|
|
|
|
|
|
|
for dec.len() > 0 && dec.err() == nil { |
|
|
|
|
r.tombstoneBuf = append(r.tombstoneBuf, Stone{ |
|
|
|
|
*res = append(*res, Stone{ |
|
|
|
|
ref: dec.be64(), |
|
|
|
|
intervals: Intervals{ |
|
|
|
|
{Mint: dec.varint64(), Maxt: dec.varint64()}, |
|
|
|
@ -1119,10 +1152,10 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
|
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
if dec.err() != nil { |
|
|
|
|
return nil, dec.err() |
|
|
|
|
return dec.err() |
|
|
|
|
} |
|
|
|
|
if len(dec.b) > 0 { |
|
|
|
|
return r.tombstoneBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) |
|
|
|
|
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) |
|
|
|
|
} |
|
|
|
|
return r.tombstoneBuf, nil |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|