|
|
@ -15,6 +15,7 @@ package agent
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"context"
|
|
|
|
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"fmt"
|
|
|
|
"math"
|
|
|
|
"math"
|
|
|
|
"path/filepath"
|
|
|
|
"path/filepath"
|
|
|
@ -24,7 +25,6 @@ import (
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/go-kit/log"
|
|
|
|
"github.com/go-kit/log"
|
|
|
|
"github.com/go-kit/log/level"
|
|
|
|
"github.com/go-kit/log/level"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/prometheus/common/model"
|
|
|
|
"github.com/prometheus/common/model"
|
|
|
|
"go.uber.org/atomic"
|
|
|
|
"go.uber.org/atomic"
|
|
|
@ -263,7 +263,7 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin
|
|
|
|
|
|
|
|
|
|
|
|
w, err := wlog.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression)
|
|
|
|
w, err := wlog.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "creating WAL")
|
|
|
|
return nil, fmt.Errorf("creating WAL: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
db := &DB{
|
|
|
|
db := &DB{
|
|
|
@ -302,7 +302,7 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin
|
|
|
|
if err := db.replayWAL(); err != nil {
|
|
|
|
if err := db.replayWAL(); err != nil {
|
|
|
|
level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err)
|
|
|
|
level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err)
|
|
|
|
if err := w.Repair(err); err != nil {
|
|
|
|
if err := w.Repair(err); err != nil {
|
|
|
|
return nil, errors.Wrap(err, "repair corrupted WAL")
|
|
|
|
return nil, fmt.Errorf("repair corrupted WAL: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
level.Info(db.logger).Log("msg", "successfully repaired WAL")
|
|
|
|
level.Info(db.logger).Log("msg", "successfully repaired WAL")
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -352,7 +352,7 @@ func (db *DB) replayWAL() error {
|
|
|
|
|
|
|
|
|
|
|
|
dir, startFrom, err := wlog.LastCheckpoint(db.wal.Dir())
|
|
|
|
dir, startFrom, err := wlog.LastCheckpoint(db.wal.Dir())
|
|
|
|
if err != nil && err != record.ErrNotFound {
|
|
|
|
if err != nil && err != record.ErrNotFound {
|
|
|
|
return errors.Wrap(err, "find last checkpoint")
|
|
|
|
return fmt.Errorf("find last checkpoint: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
|
|
|
|
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
|
|
|
@ -360,7 +360,7 @@ func (db *DB) replayWAL() error {
|
|
|
|
if err == nil {
|
|
|
|
if err == nil {
|
|
|
|
sr, err := wlog.NewSegmentsReader(dir)
|
|
|
|
sr, err := wlog.NewSegmentsReader(dir)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "open checkpoint")
|
|
|
|
return fmt.Errorf("open checkpoint: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
defer func() {
|
|
|
|
if err := sr.Close(); err != nil {
|
|
|
|
if err := sr.Close(); err != nil {
|
|
|
@ -371,7 +371,7 @@ func (db *DB) replayWAL() error {
|
|
|
|
// A corrupted checkpoint is a hard error for now and requires user
|
|
|
|
// A corrupted checkpoint is a hard error for now and requires user
|
|
|
|
// intervention. There's likely little data that can be recovered anyway.
|
|
|
|
// intervention. There's likely little data that can be recovered anyway.
|
|
|
|
if err := db.loadWAL(wlog.NewReader(sr), multiRef); err != nil {
|
|
|
|
if err := db.loadWAL(wlog.NewReader(sr), multiRef); err != nil {
|
|
|
|
return errors.Wrap(err, "backfill checkpoint")
|
|
|
|
return fmt.Errorf("backfill checkpoint: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
startFrom++
|
|
|
|
startFrom++
|
|
|
|
level.Info(db.logger).Log("msg", "WAL checkpoint loaded")
|
|
|
|
level.Info(db.logger).Log("msg", "WAL checkpoint loaded")
|
|
|
@ -380,14 +380,14 @@ func (db *DB) replayWAL() error {
|
|
|
|
// Find the last segment.
|
|
|
|
// Find the last segment.
|
|
|
|
_, last, err := wlog.Segments(db.wal.Dir())
|
|
|
|
_, last, err := wlog.Segments(db.wal.Dir())
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "finding WAL segments")
|
|
|
|
return fmt.Errorf("finding WAL segments: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Backfil segments from the most recent checkpoint onwards.
|
|
|
|
// Backfil segments from the most recent checkpoint onwards.
|
|
|
|
for i := startFrom; i <= last; i++ {
|
|
|
|
for i := startFrom; i <= last; i++ {
|
|
|
|
seg, err := wlog.OpenReadSegment(wlog.SegmentName(db.wal.Dir(), i))
|
|
|
|
seg, err := wlog.OpenReadSegment(wlog.SegmentName(db.wal.Dir(), i))
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
|
|
|
|
return fmt.Errorf("open WAL segment: %d: %w", i, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
sr := wlog.NewSegmentBufReader(seg)
|
|
|
|
sr := wlog.NewSegmentBufReader(seg)
|
|
|
@ -432,7 +432,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|
|
|
series, err = dec.Series(rec, series)
|
|
|
|
series, err = dec.Series(rec, series)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
errCh <- &wlog.CorruptionErr{
|
|
|
|
errCh <- &wlog.CorruptionErr{
|
|
|
|
Err: errors.Wrap(err, "decode series"),
|
|
|
|
Err: fmt.Errorf("decode series: %w", err),
|
|
|
|
Segment: r.Segment(),
|
|
|
|
Segment: r.Segment(),
|
|
|
|
Offset: r.Offset(),
|
|
|
|
Offset: r.Offset(),
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -444,7 +444,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|
|
|
samples, err = dec.Samples(rec, samples)
|
|
|
|
samples, err = dec.Samples(rec, samples)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
errCh <- &wlog.CorruptionErr{
|
|
|
|
errCh <- &wlog.CorruptionErr{
|
|
|
|
Err: errors.Wrap(err, "decode samples"),
|
|
|
|
Err: fmt.Errorf("decode samples: %w", err),
|
|
|
|
Segment: r.Segment(),
|
|
|
|
Segment: r.Segment(),
|
|
|
|
Offset: r.Offset(),
|
|
|
|
Offset: r.Offset(),
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -456,7 +456,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|
|
|
histograms, err = dec.HistogramSamples(rec, histograms)
|
|
|
|
histograms, err = dec.HistogramSamples(rec, histograms)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
errCh <- &wlog.CorruptionErr{
|
|
|
|
errCh <- &wlog.CorruptionErr{
|
|
|
|
Err: errors.Wrap(err, "decode histogram samples"),
|
|
|
|
Err: fmt.Errorf("decode histogram samples: %w", err),
|
|
|
|
Segment: r.Segment(),
|
|
|
|
Segment: r.Segment(),
|
|
|
|
Offset: r.Offset(),
|
|
|
|
Offset: r.Offset(),
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -468,7 +468,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|
|
|
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
|
|
|
|
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
errCh <- &wlog.CorruptionErr{
|
|
|
|
errCh <- &wlog.CorruptionErr{
|
|
|
|
Err: errors.Wrap(err, "decode float histogram samples"),
|
|
|
|
Err: fmt.Errorf("decode float histogram samples: %w", err),
|
|
|
|
Segment: r.Segment(),
|
|
|
|
Segment: r.Segment(),
|
|
|
|
Offset: r.Offset(),
|
|
|
|
Offset: r.Offset(),
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -482,7 +482,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
default:
|
|
|
|
default:
|
|
|
|
errCh <- &wlog.CorruptionErr{
|
|
|
|
errCh <- &wlog.CorruptionErr{
|
|
|
|
Err: errors.Errorf("invalid record type %v", dec.Type(rec)),
|
|
|
|
Err: fmt.Errorf("invalid record type %v", dec.Type(rec)),
|
|
|
|
Segment: r.Segment(),
|
|
|
|
Segment: r.Segment(),
|
|
|
|
Offset: r.Offset(),
|
|
|
|
Offset: r.Offset(),
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -568,7 +568,7 @@ func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
|
|
|
|
return err
|
|
|
|
return err
|
|
|
|
default:
|
|
|
|
default:
|
|
|
|
if r.Err() != nil {
|
|
|
|
if r.Err() != nil {
|
|
|
|
return errors.Wrap(r.Err(), "read records")
|
|
|
|
return fmt.Errorf("read records: %w", r.Err())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -622,13 +622,13 @@ func (db *DB) truncate(mint int64) error {
|
|
|
|
|
|
|
|
|
|
|
|
first, last, err := wlog.Segments(db.wal.Dir())
|
|
|
|
first, last, err := wlog.Segments(db.wal.Dir())
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "get segment range")
|
|
|
|
return fmt.Errorf("get segment range: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Start a new segment so low ingestion volume instances don't have more WAL
|
|
|
|
// Start a new segment so low ingestion volume instances don't have more WAL
|
|
|
|
// than needed.
|
|
|
|
// than needed.
|
|
|
|
if _, err := db.wal.NextSegment(); err != nil {
|
|
|
|
if _, err := db.wal.NextSegment(); err != nil {
|
|
|
|
return errors.Wrap(err, "next segment")
|
|
|
|
return fmt.Errorf("next segment: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
last-- // Never consider most recent segment for checkpoint
|
|
|
|
last-- // Never consider most recent segment for checkpoint
|
|
|
@ -656,10 +656,11 @@ func (db *DB) truncate(mint int64) error {
|
|
|
|
|
|
|
|
|
|
|
|
if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil {
|
|
|
|
if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil {
|
|
|
|
db.metrics.checkpointCreationFail.Inc()
|
|
|
|
db.metrics.checkpointCreationFail.Inc()
|
|
|
|
if _, ok := errors.Cause(err).(*wlog.CorruptionErr); ok {
|
|
|
|
var cerr *wlog.CorruptionErr
|
|
|
|
|
|
|
|
if errors.As(err, &cerr) {
|
|
|
|
db.metrics.walCorruptionsTotal.Inc()
|
|
|
|
db.metrics.walCorruptionsTotal.Inc()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return errors.Wrap(err, "create checkpoint")
|
|
|
|
return fmt.Errorf("create checkpoint: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if err := db.wal.Truncate(last + 1); err != nil {
|
|
|
|
if err := db.wal.Truncate(last + 1); err != nil {
|
|
|
|
// If truncating fails, we'll just try it again at the next checkpoint.
|
|
|
|
// If truncating fails, we'll just try it again at the next checkpoint.
|
|
|
@ -780,11 +781,11 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
|
|
|
|
// equivalent validation code in the TSDB's headAppender.
|
|
|
|
// equivalent validation code in the TSDB's headAppender.
|
|
|
|
l = l.WithoutEmpty()
|
|
|
|
l = l.WithoutEmpty()
|
|
|
|
if l.IsEmpty() {
|
|
|
|
if l.IsEmpty() {
|
|
|
|
return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset")
|
|
|
|
return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if lbl, dup := l.HasDuplicateLabelNames(); dup {
|
|
|
|
if lbl, dup := l.HasDuplicateLabelNames(); dup {
|
|
|
|
return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl))
|
|
|
|
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var created bool
|
|
|
|
var created bool
|
|
|
@ -841,7 +842,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exem
|
|
|
|
e.Labels = e.Labels.WithoutEmpty()
|
|
|
|
e.Labels = e.Labels.WithoutEmpty()
|
|
|
|
|
|
|
|
|
|
|
|
if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup {
|
|
|
|
if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup {
|
|
|
|
return 0, errors.Wrap(tsdb.ErrInvalidExemplar, fmt.Sprintf(`label name "%s" is not unique`, lbl))
|
|
|
|
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidExemplar)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Exemplar label length does not include chars involved in text rendering such as quotes
|
|
|
|
// Exemplar label length does not include chars involved in text rendering such as quotes
|
|
|
@ -903,11 +904,11 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
|
|
|
|
// equivalent validation code in the TSDB's headAppender.
|
|
|
|
// equivalent validation code in the TSDB's headAppender.
|
|
|
|
l = l.WithoutEmpty()
|
|
|
|
l = l.WithoutEmpty()
|
|
|
|
if l.IsEmpty() {
|
|
|
|
if l.IsEmpty() {
|
|
|
|
return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset")
|
|
|
|
return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if lbl, dup := l.HasDuplicateLabelNames(); dup {
|
|
|
|
if lbl, dup := l.HasDuplicateLabelNames(); dup {
|
|
|
|
return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl))
|
|
|
|
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var created bool
|
|
|
|
var created bool
|
|
|
|