Browse Source

tsdb: use Go standard errors

Signed-off-by: Matthieu MOREL <matthieu.morel35@gmail.com>
pull/13155/head
Matthieu MOREL 1 year ago
parent
commit
8f6cf3aabb
  1. 6
      .golangci.yml
  2. 2
      go.mod
  3. 2
      tsdb/agent/db.go
  4. 29
      tsdb/block.go
  5. 11
      tsdb/blockwriter.go
  6. 5
      tsdb/chunkenc/chunk_test.go
  7. 9
      tsdb/chunks/head_chunks.go
  8. 50
      tsdb/compact.go
  9. 2
      tsdb/compact_test.go
  10. 154
      tsdb/db.go
  11. 3
      tsdb/db_test.go
  12. 3
      tsdb/exemplar.go
  13. 48
      tsdb/head.go
  14. 50
      tsdb/head_append.go
  15. 2
      tsdb/head_bench_test.go
  16. 15
      tsdb/head_read.go
  17. 15
      tsdb/head_test.go
  18. 103
      tsdb/head_wal.go
  19. 6
      tsdb/index/index.go
  20. 4
      tsdb/mocks_test.go
  21. 40
      tsdb/querier.go
  22. 4
      tsdb/querier_test.go
  23. 43
      tsdb/repair.go
  24. 101
      tsdb/wal.go
  25. 4
      tsdb/wlog/wlog.go

6
.golangci.yml

@ -36,13 +36,9 @@ issues:
- path: _test.go
linters:
- errcheck
- path: tsdb/
- path: "tsdb/head_wal.go"
linters:
- errorlint
- path: tsdb/
text: "import 'github.com/pkg/errors' is not allowed"
linters:
- depguard
- linters:
- godot
source: "^// ==="

2
go.mod

@ -43,7 +43,6 @@ require (
github.com/oklog/run v1.1.0
github.com/oklog/ulid v1.3.1
github.com/ovh/go-ovh v1.4.3
github.com/pkg/errors v0.9.1
github.com/prometheus/alertmanager v0.26.0
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/client_model v0.5.0
@ -167,6 +166,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect

2
tsdb/agent/db.go

@ -359,7 +359,7 @@ func (db *DB) replayWAL() error {
start := time.Now()
dir, startFrom, err := wlog.LastCheckpoint(db.wal.Dir())
if err != nil && err != record.ErrNotFound {
if err != nil && !errors.Is(err, record.ErrNotFound) {
return fmt.Errorf("find last checkpoint: %w", err)
}

29
tsdb/block.go

@ -17,6 +17,7 @@ package tsdb
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
@ -26,7 +27,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/labels"
@ -479,14 +479,19 @@ func (r blockIndexReader) SortedLabelValues(ctx context.Context, name string, ma
slices.Sort(st)
}
}
return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
if err != nil {
return st, fmt.Errorf("block: %s: %w", r.b.Meta().ULID, err)
}
return st, nil
}
func (r blockIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
if len(matchers) == 0 {
st, err := r.ir.LabelValues(ctx, name)
return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
if err != nil {
return st, fmt.Errorf("block: %s: %w", r.b.Meta().ULID, err)
}
return st, nil
}
return labelValuesWithMatchers(ctx, r.ir, name, matchers...)
@ -503,7 +508,7 @@ func (r blockIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Ma
func (r blockIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
p, err := r.ir.Postings(ctx, name, values...)
if err != nil {
return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
return p, fmt.Errorf("block: %s: %w", r.b.Meta().ULID, err)
}
return p, nil
}
@ -514,7 +519,7 @@ func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
func (r blockIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
if err := r.ir.Series(ref, builder, chks); err != nil {
return errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
return fmt.Errorf("block: %s: %w", r.b.Meta().ULID, err)
}
return nil
}
@ -566,7 +571,7 @@ func (pb *Block) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Mat
p, err := PostingsForMatchers(ctx, pb.indexr, ms...)
if err != nil {
return errors.Wrap(err, "select series")
return fmt.Errorf("select series: %w", err)
}
ir := pb.indexr
@ -654,12 +659,12 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, bool, er
func (pb *Block) Snapshot(dir string) error {
blockDir := filepath.Join(dir, pb.meta.ULID.String())
if err := os.MkdirAll(blockDir, 0o777); err != nil {
return errors.Wrap(err, "create snapshot block dir")
return fmt.Errorf("create snapshot block dir: %w", err)
}
chunksDir := chunkDir(blockDir)
if err := os.MkdirAll(chunksDir, 0o777); err != nil {
return errors.Wrap(err, "create snapshot chunk dir")
return fmt.Errorf("create snapshot chunk dir: %w", err)
}
// Hardlink meta, index and tombstones
@ -669,7 +674,7 @@ func (pb *Block) Snapshot(dir string) error {
tombstones.TombstonesFilename,
} {
if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil {
return errors.Wrapf(err, "create snapshot %s", fname)
return fmt.Errorf("create snapshot %s: %w", fname, err)
}
}
@ -677,13 +682,13 @@ func (pb *Block) Snapshot(dir string) error {
curChunkDir := chunkDir(pb.dir)
files, err := os.ReadDir(curChunkDir)
if err != nil {
return errors.Wrap(err, "ReadDir the current chunk dir")
return fmt.Errorf("ReadDir the current chunk dir: %w", err)
}
for _, f := range files {
err := os.Link(filepath.Join(curChunkDir, f.Name()), filepath.Join(chunksDir, f.Name()))
if err != nil {
return errors.Wrap(err, "hardlink a chunk")
return fmt.Errorf("hardlink a chunk: %w", err)
}
}

11
tsdb/blockwriter.go

@ -15,13 +15,14 @@ package tsdb
import (
"context"
"errors"
"fmt"
"math"
"os"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
@ -65,7 +66,7 @@ func NewBlockWriter(logger log.Logger, dir string, blockSize int64) (*BlockWrite
func (w *BlockWriter) initHead() error {
chunkDir, err := os.MkdirTemp(os.TempDir(), "head")
if err != nil {
return errors.Wrap(err, "create temp dir")
return fmt.Errorf("create temp dir: %w", err)
}
w.chunkDir = chunkDir
opts := DefaultHeadOptions()
@ -74,7 +75,7 @@ func (w *BlockWriter) initHead() error {
opts.EnableNativeHistograms.Store(true)
h, err := NewHead(nil, w.logger, nil, nil, opts, NewHeadStats())
if err != nil {
return errors.Wrap(err, "tsdb.NewHead")
return fmt.Errorf("tsdb.NewHead: %w", err)
}
w.head = h
@ -102,11 +103,11 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
[]int64{w.blockSize},
chunkenc.NewPool(), nil)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "create leveled compactor")
return ulid.ULID{}, fmt.Errorf("create leveled compactor: %w", err)
}
id, err := compactor.Write(w.destinationDir, w.head, mint, maxt, nil)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "compactor write")
return ulid.ULID{}, fmt.Errorf("compactor write: %w", err)
}
return id, nil

5
tsdb/chunkenc/chunk_test.go

@ -14,6 +14,7 @@
package chunkenc
import (
"errors"
"fmt"
"io"
"math/rand"
@ -153,8 +154,8 @@ func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
res = v
i++
}
if it.Err() != io.EOF {
require.NoError(b, it.Err())
if err := it.Err(); err != nil && !errors.Is(err, io.EOF) {
require.NoError(b, err)
}
_ = res
}

9
tsdb/chunks/head_chunks.go

@ -111,6 +111,10 @@ func (e *CorruptionErr) Error() string {
return fmt.Errorf("corruption in head chunk file %s: %w", segmentFile(e.Dir, e.FileIndex), e.Err).Error()
}
func (e *CorruptionErr) Unwrap() error {
return e.Err
}
// chunkPos keeps track of the position in the head chunk files.
// chunkPos is not thread-safe, a lock must be used to protect it.
type chunkPos struct {
@ -400,7 +404,7 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro
buf := make([]byte, MagicChunksSize)
size, err := f.Read(buf)
if err != nil && err != io.EOF {
if err != nil && !errors.Is(err, io.EOF) {
return files, fmt.Errorf("failed to read magic number during last head chunk file repair: %w", err)
}
if err := f.Close(); err != nil {
@ -892,7 +896,8 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
// Extract the encoding from the byte. ChunkDiskMapper uses only the last 7 bits for the encoding.
chkEnc = cdm.RemoveMasks(chkEnc)
if err := f(seriesRef, chunkRef, mint, maxt, numSamples, chkEnc, isOOO); err != nil {
if cerr, ok := err.(*CorruptionErr); ok {
var cerr *CorruptionErr
if errors.As(err, &cerr) {
cerr.Dir = cdm.dir.Name()
cerr.FileIndex = segID
return cerr

50
tsdb/compact.go

@ -16,6 +16,7 @@ package tsdb
import (
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"os"
@ -25,7 +26,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
@ -485,7 +485,7 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string,
if !errors.Is(err, context.Canceled) {
for _, b := range bs {
if err := b.setCompactionFailed(); err != nil {
errs.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
errs.Add(fmt.Errorf("setting compaction failed for block: %s: %w", b.Dir(), err))
}
}
}
@ -586,7 +586,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize)
if err != nil {
return errors.Wrap(err, "open chunk writer")
return fmt.Errorf("open chunk writer: %w", err)
}
closers = append(closers, chunkw)
// Record written chunk sizes on level 1 compactions.
@ -601,12 +601,12 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
if err != nil {
return errors.Wrap(err, "open index writer")
return fmt.Errorf("open index writer: %w", err)
}
closers = append(closers, indexw)
if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "populate block")
return fmt.Errorf("populate block: %w", err)
}
select {
@ -634,17 +634,17 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
}
if _, err = writeMetaFile(c.logger, tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
return fmt.Errorf("write merged meta: %w", err)
}
// Create an empty tombstones file.
if _, err := tombstones.WriteFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
return fmt.Errorf("write new tombstones file: %w", err)
}
df, err := fileutil.OpenDir(tmp)
if err != nil {
return errors.Wrap(err, "open temporary block dir")
return fmt.Errorf("open temporary block dir: %w", err)
}
defer func() {
if df != nil {
@ -653,18 +653,18 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
}()
if err := df.Sync(); err != nil {
return errors.Wrap(err, "sync temporary dir file")
return fmt.Errorf("sync temporary dir file: %w", err)
}
// Close temp dir before rename block dir (for windows platform).
if err = df.Close(); err != nil {
return errors.Wrap(err, "close temporary dir")
return fmt.Errorf("close temporary dir: %w", err)
}
df = nil
// Block successfully written, make it visible in destination dir by moving it from tmp one.
if err := fileutil.Replace(tmp, dir); err != nil {
return errors.Wrap(err, "rename block dir")
return fmt.Errorf("rename block dir: %w", err)
}
return nil
@ -693,7 +693,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
defer func() {
errs := tsdb_errors.NewMulti(err)
if cerr := tsdb_errors.CloseAll(closers); cerr != nil {
errs.Add(errors.Wrap(cerr, "close"))
errs.Add(fmt.Errorf("close: %w", cerr))
}
err = errs.Err()
metrics.PopulatingBlocks.Set(0)
@ -721,19 +721,19 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
indexr, err := b.Index()
if err != nil {
return errors.Wrapf(err, "open index reader for block %+v", b.Meta())
return fmt.Errorf("open index reader for block %+v: %w", b.Meta(), err)
}
closers = append(closers, indexr)
chunkr, err := b.Chunks()
if err != nil {
return errors.Wrapf(err, "open chunk reader for block %+v", b.Meta())
return fmt.Errorf("open chunk reader for block %+v: %w", b.Meta(), err)
}
closers = append(closers, chunkr)
tombsr, err := b.Tombstones()
if err != nil {
return errors.Wrapf(err, "open tombstone reader for block %+v", b.Meta())
return fmt.Errorf("open tombstone reader for block %+v: %w", b.Meta(), err)
}
closers = append(closers, tombsr)
@ -755,11 +755,11 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
for symbols.Next() {
if err := indexw.AddSymbol(symbols.At()); err != nil {
return errors.Wrap(err, "add symbol")
return fmt.Errorf("add symbol: %w", err)
}
}
if symbols.Err() != nil {
return errors.Wrap(symbols.Err(), "next symbol")
if err := symbols.Err(); err != nil {
return fmt.Errorf("next symbol: %w", err)
}
var (
@ -791,8 +791,8 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
// chunk file purposes.
chks = append(chks, chksIter.At())
}
if chksIter.Err() != nil {
return errors.Wrap(chksIter.Err(), "chunk iter")
if err := chksIter.Err(); err != nil {
return fmt.Errorf("chunk iter: %w", err)
}
// Skip the series with all deleted chunks.
@ -801,10 +801,10 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
}
if err := chunkw.WriteChunks(chks...); err != nil {
return errors.Wrap(err, "write chunks")
return fmt.Errorf("write chunks: %w", err)
}
if err := indexw.AddSeries(ref, s.Labels(), chks...); err != nil {
return errors.Wrap(err, "add series")
return fmt.Errorf("add series: %w", err)
}
meta.Stats.NumChunks += uint64(len(chks))
@ -815,13 +815,13 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
for _, chk := range chks {
if err := chunkPool.Put(chk.Chunk); err != nil {
return errors.Wrap(err, "put chunk")
return fmt.Errorf("put chunk: %w", err)
}
}
ref++
}
if set.Err() != nil {
return errors.Wrap(set.Err(), "iterate compaction set")
if err := set.Err(); err != nil {
return fmt.Errorf("iterate compaction set: %w", err)
}
return nil

2
tsdb/compact_test.go

@ -15,6 +15,7 @@ package tsdb
import (
"context"
"errors"
"fmt"
"math"
"math/rand"
@ -27,7 +28,6 @@ import (
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"

154
tsdb/db.go

@ -16,6 +16,7 @@ package tsdb
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
@ -30,7 +31,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
@ -386,7 +386,7 @@ type DBReadOnly struct {
// OpenDBReadOnly opens DB in the given directory for read only operations.
func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) {
if _, err := os.Stat(dir); err != nil {
return nil, errors.Wrap(err, "opening the db dir")
return nil, fmt.Errorf("opening the db dir: %w", err)
}
if l == nil {
@ -407,7 +407,7 @@ func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) {
func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
blockReaders, err := db.Blocks()
if err != nil {
return errors.Wrap(err, "read blocks")
return fmt.Errorf("read blocks: %w", err)
}
maxBlockTime := int64(math.MinInt64)
if len(blockReaders) > 0 {
@ -432,15 +432,16 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
return err
}
defer func() {
returnErr = tsdb_errors.NewMulti(
returnErr,
errors.Wrap(head.Close(), "closing Head"),
).Err()
errs := tsdb_errors.NewMulti(returnErr)
if err := head.Close(); err != nil {
errs.Add(fmt.Errorf("closing Head: %w", err))
}
returnErr = errs.Err()
}()
// Set the min valid time for the ingested wal samples
// to be no lower than the maxt of the last block.
if err := head.Init(maxBlockTime); err != nil {
return errors.Wrap(err, "read WAL")
return fmt.Errorf("read WAL: %w", err)
}
mint := head.MinTime()
maxt := head.MaxTime()
@ -454,12 +455,15 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
nil,
)
if err != nil {
return errors.Wrap(err, "create leveled compactor")
return fmt.Errorf("create leveled compactor: %w", err)
}
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
_, err = compactor.Write(dir, rh, mint, maxt+1, nil)
return errors.Wrap(err, "writing WAL")
if err != nil {
return fmt.Errorf("writing WAL: %w", err)
}
return nil
}
func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQueryable, error) {
@ -518,7 +522,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
// Set the min valid time for the ingested wal samples
// to be no lower than the maxt of the last block.
if err := head.Init(maxBlockTime); err != nil {
return nil, errors.Wrap(err, "read WAL")
return nil, fmt.Errorf("read WAL: %w", err)
}
// Set the wal to nil to disable all wal operations.
// This is mainly to avoid blocking when closing the head.
@ -580,7 +584,9 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
}
errs := tsdb_errors.NewMulti()
for ulid, err := range corrupted {
errs.Add(errors.Wrapf(err, "corrupted block %s", ulid.String()))
if err != nil {
errs.Add(fmt.Errorf("corrupted block %s: %w", ulid.String(), err))
}
}
return nil, errs.Err()
}
@ -761,7 +767,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
// Fixup bad format written by Prometheus 2.1.
if err := repairBadIndexVersion(l, dir); err != nil {
return nil, errors.Wrap(err, "repair bad index version")
return nil, fmt.Errorf("repair bad index version: %w", err)
}
walDir := filepath.Join(dir, "wal")
@ -769,12 +775,12 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
// Migrate old WAL if one exists.
if err := MigrateWAL(l, walDir); err != nil {
return nil, errors.Wrap(err, "migrate WAL")
return nil, fmt.Errorf("migrate WAL: %w", err)
}
for _, tmpDir := range []string{walDir, dir} {
// Remove tmp dirs.
if err := removeBestEffortTmpDirs(l, tmpDir); err != nil {
return nil, errors.Wrap(err, "remove tmp dirs")
return nil, fmt.Errorf("remove tmp dirs: %w", err)
}
}
@ -797,11 +803,11 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
}
close(db.donec) // DB is never run if it was an error, so close this channel here.
returnedErr = tsdb_errors.NewMulti(
returnedErr,
errors.Wrap(db.Close(), "close DB after failed startup"),
).Err()
errs := tsdb_errors.NewMulti(returnedErr)
if err := db.Close(); err != nil {
errs.Add(fmt.Errorf("close DB after failed startup: %w", err))
}
returnedErr = errs.Err()
}()
if db.blocksToDelete == nil {
@ -823,7 +829,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil)
if err != nil {
cancel()
return nil, errors.Wrap(err, "create leveled compactor")
return nil, fmt.Errorf("create leveled compactor: %w", err)
}
db.compactCancel = cancel
@ -905,17 +911,17 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
if initErr := db.head.Init(minValidTime); initErr != nil {
db.head.metrics.walCorruptionsTotal.Inc()
e, ok := initErr.(*errLoadWbl)
if ok {
var e *errLoadWbl
if errors.As(initErr, &e) {
level.Warn(db.logger).Log("msg", "Encountered WBL read error, attempting repair", "err", initErr)
if err := wbl.Repair(e.err); err != nil {
return nil, errors.Wrap(err, "repair corrupted WBL")
return nil, fmt.Errorf("repair corrupted WBL: %w", err)
}
level.Info(db.logger).Log("msg", "Successfully repaired WBL")
} else {
level.Warn(db.logger).Log("msg", "Encountered WAL read error, attempting repair", "err", initErr)
if err := wal.Repair(initErr); err != nil {
return nil, errors.Wrap(err, "repair corrupted WAL")
return nil, fmt.Errorf("repair corrupted WAL: %w", err)
}
level.Info(db.logger).Log("msg", "Successfully repaired WAL")
}
@ -1131,10 +1137,11 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) {
lastBlockMaxt := int64(math.MinInt64)
defer func() {
returnErr = tsdb_errors.NewMulti(
returnErr,
errors.Wrap(db.head.truncateWAL(lastBlockMaxt), "WAL truncation in Compact defer"),
).Err()
errs := tsdb_errors.NewMulti(returnErr)
if err := db.head.truncateWAL(lastBlockMaxt); err != nil {
errs.Add(fmt.Errorf("WAL truncation in Compact defer: %w", err))
}
returnErr = errs.Err()
}()
start := time.Now()
@ -1168,7 +1175,7 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) {
db.head.WaitForAppendersOverlapping(rh.MaxTime())
if err := db.compactHead(rh); err != nil {
return errors.Wrap(err, "compact head")
return fmt.Errorf("compact head: %w", err)
}
// Consider only successful compactions for WAL truncation.
lastBlockMaxt = maxt
@ -1177,7 +1184,7 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) {
// Clear some disk space before compacting blocks, especially important
// when Head compaction happened over a long time range.
if err := db.head.truncateWAL(lastBlockMaxt); err != nil {
return errors.Wrap(err, "WAL truncation in Compact")
return fmt.Errorf("WAL truncation in Compact: %w", err)
}
compactionDuration := time.Since(start)
@ -1192,7 +1199,7 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) {
if lastBlockMaxt != math.MinInt64 {
// The head was compacted, so we compact OOO head as well.
if err := db.compactOOOHead(ctx); err != nil {
return errors.Wrap(err, "compact ooo head")
return fmt.Errorf("compact ooo head: %w", err)
}
}
@ -1205,11 +1212,11 @@ func (db *DB) CompactHead(head *RangeHead) error {
defer db.cmtx.Unlock()
if err := db.compactHead(head); err != nil {
return errors.Wrap(err, "compact head")
return fmt.Errorf("compact head: %w", err)
}
if err := db.head.truncateWAL(head.BlockMaxTime()); err != nil {
return errors.Wrap(err, "WAL truncation")
return fmt.Errorf("WAL truncation: %w", err)
}
return nil
}
@ -1228,12 +1235,12 @@ func (db *DB) compactOOOHead(ctx context.Context) error {
}
oooHead, err := NewOOOCompactionHead(ctx, db.head)
if err != nil {
return errors.Wrap(err, "get ooo compaction head")
return fmt.Errorf("get ooo compaction head: %w", err)
}
ulids, err := db.compactOOO(db.dir, oooHead)
if err != nil {
return errors.Wrap(err, "compact ooo head")
return fmt.Errorf("compact ooo head: %w", err)
}
if err := db.reloadBlocks(); err != nil {
errs := tsdb_errors.NewMulti(err)
@ -1242,7 +1249,7 @@ func (db *DB) compactOOOHead(ctx context.Context) error {
errs.Add(errRemoveAll)
}
}
return errors.Wrap(errs.Err(), "reloadBlocks blocks after failed compact ooo head")
return fmt.Errorf("reloadBlocks blocks after failed compact ooo head: %w", errs.Err())
}
lastWBLFile, minOOOMmapRef := oooHead.LastWBLFile(), oooHead.LastMmapRef()
@ -1262,7 +1269,7 @@ func (db *DB) compactOOOHead(ctx context.Context) error {
}
if err := db.head.truncateOOO(lastWBLFile, minOOOMmapRef); err != nil {
return errors.Wrap(err, "truncate ooo wbl")
return fmt.Errorf("truncate ooo wbl: %w", err)
}
}
@ -1298,12 +1305,12 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID
blockDir := filepath.Join(dest, uid.String())
meta, _, err := readMetaFile(blockDir)
if err != nil {
return ulids, errors.Wrap(err, "read meta")
return ulids, fmt.Errorf("read meta: %w", err)
}
meta.Compaction.SetOutOfOrder()
_, err = writeMetaFile(db.logger, blockDir, meta)
if err != nil {
return ulids, errors.Wrap(err, "write meta")
return ulids, fmt.Errorf("write meta: %w", err)
}
}
}
@ -1329,20 +1336,20 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID
func (db *DB) compactHead(head *RangeHead) error {
uid, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil)
if err != nil {
return errors.Wrap(err, "persist head block")
return fmt.Errorf("persist head block: %w", err)
}
if err := db.reloadBlocks(); err != nil {
if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil {
return tsdb_errors.NewMulti(
errors.Wrap(err, "reloadBlocks blocks"),
errors.Wrapf(errRemoveAll, "delete persisted head block after failed db reloadBlocks:%s", uid),
fmt.Errorf("reloadBlocks blocks: %w", err),
fmt.Errorf("delete persisted head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll),
).Err()
}
return errors.Wrap(err, "reloadBlocks blocks")
return fmt.Errorf("reloadBlocks blocks: %w", err)
}
if err = db.head.truncateMemory(head.BlockMaxTime()); err != nil {
return errors.Wrap(err, "head memory truncate")
return fmt.Errorf("head memory truncate: %w", err)
}
return nil
}
@ -1354,7 +1361,7 @@ func (db *DB) compactBlocks() (err error) {
for {
plan, err := db.compactor.Plan(db.dir)
if err != nil {
return errors.Wrap(err, "plan compaction")
return fmt.Errorf("plan compaction: %w", err)
}
if len(plan) == 0 {
break
@ -1368,14 +1375,14 @@ func (db *DB) compactBlocks() (err error) {
uid, err := db.compactor.Compact(db.dir, plan, db.blocks)
if err != nil {
return errors.Wrapf(err, "compact %s", plan)
return fmt.Errorf("compact %s: %w", plan, err)
}
if err := db.reloadBlocks(); err != nil {
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {
return errors.Wrapf(err, "delete compacted block after failed db reloadBlocks:%s", uid)
return fmt.Errorf("delete compacted block after failed db reloadBlocks:%s: %w", uid, err)
}
return errors.Wrap(err, "reloadBlocks blocks")
return fmt.Errorf("reloadBlocks blocks: %w", err)
}
}
@ -1396,14 +1403,14 @@ func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) {
// reload reloads blocks and truncates the head and its WAL.
func (db *DB) reload() error {
if err := db.reloadBlocks(); err != nil {
return errors.Wrap(err, "reloadBlocks")
return fmt.Errorf("reloadBlocks: %w", err)
}
maxt, ok := db.inOrderBlocksMaxTime()
if !ok {
return nil
}
if err := db.head.Truncate(maxt); err != nil {
return errors.Wrap(err, "head truncate")
return fmt.Errorf("head truncate: %w", err)
}
return nil
}
@ -1457,7 +1464,9 @@ func (db *DB) reloadBlocks() (err error) {
}
errs := tsdb_errors.NewMulti()
for ulid, err := range corrupted {
errs.Add(errors.Wrapf(err, "corrupted block %s", ulid.String()))
if err != nil {
errs.Add(fmt.Errorf("corrupted block %s: %w", ulid.String(), err))
}
}
return errs.Err()
}
@ -1509,7 +1518,7 @@ func (db *DB) reloadBlocks() (err error) {
}
}
if err := db.deleteBlocks(deletable); err != nil {
return errors.Wrapf(err, "delete %v blocks", len(deletable))
return fmt.Errorf("delete %v blocks: %w", len(deletable), err)
}
return nil
}
@ -1517,7 +1526,7 @@ func (db *DB) reloadBlocks() (err error) {
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
bDirs, err := blockDirs(dir)
if err != nil {
return nil, nil, errors.Wrap(err, "find blocks")
return nil, nil, fmt.Errorf("find blocks: %w", err)
}
corrupted = make(map[ulid.ULID]error)
@ -1651,16 +1660,16 @@ func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error {
// Noop.
continue
case err != nil:
return errors.Wrapf(err, "stat dir %v", toDelete)
return fmt.Errorf("stat dir %v: %w", toDelete, err)
}
// Replace atomically to avoid partial block when process would crash during deletion.
tmpToDelete := filepath.Join(db.dir, fmt.Sprintf("%s%s", ulid, tmpForDeletionBlockDirSuffix))
if err := fileutil.Replace(toDelete, tmpToDelete); err != nil {
return errors.Wrapf(err, "replace of obsolete block for deletion %s", ulid)
return fmt.Errorf("replace of obsolete block for deletion %s: %w", ulid, err)
}
if err := os.RemoveAll(tmpToDelete); err != nil {
return errors.Wrapf(err, "delete obsolete block %s", ulid)
return fmt.Errorf("delete obsolete block %s: %w", ulid, err)
}
level.Info(db.logger).Log("msg", "Deleting obsolete block", "block", ulid)
}
@ -1868,7 +1877,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
level.Info(db.logger).Log("msg", "Snapshotting block", "block", b)
if err := b.Snapshot(dir); err != nil {
return errors.Wrapf(err, "error snapshotting block: %s", b.Dir())
return fmt.Errorf("error snapshotting block: %s: %w", b.Dir(), err)
}
}
if !withHead {
@ -1881,7 +1890,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
if _, err := db.compactor.Write(dir, head, mint, maxt+1, nil); err != nil {
return errors.Wrap(err, "snapshot head block")
return fmt.Errorf("snapshot head block: %w", err)
}
return nil
}
@ -1916,7 +1925,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
var err error
inOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open block querier for head %s", rh)
return nil, fmt.Errorf("open block querier for head %s: %w", rh, err)
}
// Getting the querier above registers itself in the queue that the truncation waits on.
@ -1925,7 +1934,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
if shouldClose {
if err := inOrderHeadQuerier.Close(); err != nil {
return nil, errors.Wrapf(err, "closing head block querier %s", rh)
return nil, fmt.Errorf("closing head block querier %s: %w", rh, err)
}
inOrderHeadQuerier = nil
}
@ -1933,7 +1942,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
rh := NewRangeHead(db.head, newMint, maxt)
inOrderHeadQuerier, err = NewBlockQuerier(rh, newMint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open block querier for head while getting new querier %s", rh)
return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err)
}
}
@ -1950,7 +1959,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
// If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead.
rh.isoState.Close()
return nil, errors.Wrapf(err, "open block querier for ooo head %s", rh)
return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err)
}
blockQueriers = append(blockQueriers, outOfOrderHeadQuerier)
@ -1959,7 +1968,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
for _, b := range blocks {
q, err := NewBlockQuerier(b, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open querier for block %s", b)
return nil, fmt.Errorf("open querier for block %s: %w", b, err)
}
blockQueriers = append(blockQueriers, q)
}
@ -1997,7 +2006,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
rh := NewRangeHead(db.head, mint, maxt)
inOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open querier for head %s", rh)
return nil, fmt.Errorf("open querier for head %s: %w", rh, err)
}
// Getting the querier above registers itself in the queue that the truncation waits on.
@ -2006,7 +2015,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt)
if shouldClose {
if err := inOrderHeadQuerier.Close(); err != nil {
return nil, errors.Wrapf(err, "closing head querier %s", rh)
return nil, fmt.Errorf("closing head querier %s: %w", rh, err)
}
inOrderHeadQuerier = nil
}
@ -2014,7 +2023,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
rh := NewRangeHead(db.head, newMint, maxt)
inOrderHeadQuerier, err = NewBlockChunkQuerier(rh, newMint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open querier for head while getting new querier %s", rh)
return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err)
}
}
@ -2027,7 +2036,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
outOfOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open block chunk querier for ooo head %s", rh)
return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err)
}
blockQueriers = append(blockQueriers, outOfOrderHeadQuerier)
@ -2036,7 +2045,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
for _, b := range blocks {
q, err := NewBlockChunkQuerier(b, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "open querier for block %s", b)
return nil, fmt.Errorf("open querier for block %s: %w", b, err)
}
blockQueriers = append(blockQueriers, q)
}
@ -2105,7 +2114,7 @@ func (db *DB) CleanTombstones() (err error) {
for _, pb := range db.Blocks() {
uid, safeToDelete, cleanErr := pb.CleanTombstones(db.Dir(), db.compactor)
if cleanErr != nil {
return errors.Wrapf(cleanErr, "clean tombstones: %s", pb.Dir())
return fmt.Errorf("clean tombstones: %s: %w", pb.Dir(), cleanErr)
}
if !safeToDelete {
// There was nothing to clean.
@ -2133,7 +2142,10 @@ func (db *DB) CleanTombstones() (err error) {
level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err)
}
}
return errors.Wrap(err, "reload blocks")
if err != nil {
return fmt.Errorf("reload blocks: %w", err)
}
return nil
}
}
return nil

3
tsdb/db_test.go

@ -34,7 +34,6 @@ import (
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
@ -355,7 +354,7 @@ func TestDBAppenderAddRef(t *testing.T) {
// Missing labels & invalid refs should fail.
_, err = app2.Append(9999999, labels.EmptyLabels(), 1, 1)
require.Equal(t, ErrInvalidSample, errors.Cause(err))
require.ErrorIs(t, err, ErrInvalidSample)
require.NoError(t, app2.Commit())

3
tsdb/exemplar.go

@ -15,6 +15,7 @@ package tsdb
import (
"context"
"errors"
"sync"
"unicode/utf8"
@ -363,7 +364,7 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
err := ce.validateExemplar(seriesLabels, e, true)
if err != nil {
if err == storage.ErrDuplicateExemplar {
if errors.Is(err, storage.ErrDuplicateExemplar) {
// Duplicate exemplar, noop.
return nil
}

48
tsdb/head.go

@ -15,6 +15,7 @@ package tsdb
import (
"context"
"errors"
"fmt"
"io"
"math"
@ -27,7 +28,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"go.uber.org/atomic"
"github.com/prometheus/client_golang/prometheus"
@ -623,11 +623,11 @@ func (h *Head) Init(minValidTime int64) error {
if h.wal != nil {
_, endAt, err := wlog.Segments(h.wal.Dir())
if err != nil {
return errors.Wrap(err, "finding WAL segments")
return fmt.Errorf("finding WAL segments: %w", err)
}
_, idx, _, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
if err != nil && err != record.ErrNotFound {
if err != nil && !errors.Is(err, record.ErrNotFound) {
level.Error(h.logger).Log("msg", "Could not find last snapshot", "err", err)
}
@ -674,7 +674,8 @@ func (h *Head) Init(minValidTime int64) error {
if err != nil {
// TODO(codesome): clear out all m-map chunks here for refSeries.
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err)
if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok {
var cerr *chunks.CorruptionErr
if errors.As(err, &cerr) {
h.metrics.mmapChunkCorruptionTotal.Inc()
}
@ -701,14 +702,14 @@ func (h *Head) Init(minValidTime int64) error {
checkpointReplayStart := time.Now()
// Backfill the checkpoint first if it exists.
dir, startFrom, err := wlog.LastCheckpoint(h.wal.Dir())
if err != nil && err != record.ErrNotFound {
return errors.Wrap(err, "find last checkpoint")
if err != nil && !errors.Is(err, record.ErrNotFound) {
return fmt.Errorf("find last checkpoint: %w", err)
}
// Find the last segment.
_, endAt, e := wlog.Segments(h.wal.Dir())
if e != nil {
return errors.Wrap(e, "finding WAL segments")
return fmt.Errorf("finding WAL segments: %w", e)
}
h.startWALReplayStatus(startFrom, endAt)
@ -717,7 +718,7 @@ func (h *Head) Init(minValidTime int64) error {
if err == nil && startFrom >= snapIdx {
sr, err := wlog.NewSegmentsReader(dir)
if err != nil {
return errors.Wrap(err, "open checkpoint")
return fmt.Errorf("open checkpoint: %w", err)
}
defer func() {
if err := sr.Close(); err != nil {
@ -728,7 +729,7 @@ func (h *Head) Init(minValidTime int64) error {
// A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway.
if err := h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks); err != nil {
return errors.Wrap(err, "backfill checkpoint")
return fmt.Errorf("backfill checkpoint: %w", err)
}
h.updateWALReplayStatusRead(startFrom)
startFrom++
@ -745,7 +746,7 @@ func (h *Head) Init(minValidTime int64) error {
for i := startFrom; i <= endAt; i++ {
s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wal.Dir(), i))
if err != nil {
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
return fmt.Errorf("open WAL segment: %d: %w", i, err)
}
offset := 0
@ -758,7 +759,7 @@ func (h *Head) Init(minValidTime int64) error {
continue
}
if err != nil {
return errors.Wrapf(err, "segment reader (offset=%d)", offset)
return fmt.Errorf("segment reader (offset=%d): %w", offset, err)
}
err = h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks)
if err := sr.Close(); err != nil {
@ -777,14 +778,14 @@ func (h *Head) Init(minValidTime int64) error {
// Replay WBL.
startFrom, endAt, e = wlog.Segments(h.wbl.Dir())
if e != nil {
return &errLoadWbl{errors.Wrap(e, "finding WBL segments")}
return &errLoadWbl{fmt.Errorf("finding WBL segments: %w", e)}
}
h.startWALReplayStatus(startFrom, endAt)
for i := startFrom; i <= endAt; i++ {
s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wbl.Dir(), i))
if err != nil {
return &errLoadWbl{errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i))}
return &errLoadWbl{fmt.Errorf("open WBL segment: %d: %w", i, err)}
}
sr := wlog.NewSegmentBufReader(s)
@ -905,7 +906,7 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries)
return nil
}); err != nil {
// secondLastRef because the lastRef caused an error.
return nil, nil, secondLastRef, errors.Wrap(err, "iterate on on-disk chunks")
return nil, nil, secondLastRef, fmt.Errorf("iterate on on-disk chunks: %w", err)
}
return mmappedChunks, oooMmappedChunks, lastRef, nil
}
@ -1224,12 +1225,12 @@ func (h *Head) truncateWAL(mint int64) error {
first, last, err := wlog.Segments(h.wal.Dir())
if err != nil {
return errors.Wrap(err, "get segment range")
return fmt.Errorf("get segment range: %w", err)
}
// Start a new segment, so low ingestion volume TSDB don't have more WAL than
// needed.
if _, err := h.wal.NextSegment(); err != nil {
return errors.Wrap(err, "next segment")
return fmt.Errorf("next segment: %w", err)
}
last-- // Never consider last segment for checkpoint.
if last < 0 {
@ -1256,10 +1257,11 @@ func (h *Head) truncateWAL(mint int64) error {
h.metrics.checkpointCreationTotal.Inc()
if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil {
h.metrics.checkpointCreationFail.Inc()
if _, ok := errors.Cause(err).(*wlog.CorruptionErr); ok {
var cerr *chunks.CorruptionErr
if errors.As(err, &cerr) {
h.metrics.walCorruptionsTotal.Inc()
}
return errors.Wrap(err, "create checkpoint")
return fmt.Errorf("create checkpoint: %w", err)
}
if err := h.wal.Truncate(last + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
@ -1352,7 +1354,7 @@ func (h *Head) truncateSeriesAndChunkDiskMapper(caller string) error {
// Truncate the chunk m-mapper.
if err := h.chunkDiskMapper.Truncate(uint32(minMmapFile)); err != nil {
return errors.Wrap(err, "truncate chunks.HeadReadWriter by file number")
return fmt.Errorf("truncate chunks.HeadReadWriter by file number: %w", err)
}
return nil
}
@ -1467,13 +1469,13 @@ func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Match
p, err := PostingsForMatchers(ctx, ir, ms...)
if err != nil {
return errors.Wrap(err, "select series")
return fmt.Errorf("select series: %w", err)
}
var stones []tombstones.Stone
for p.Next() {
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "select series")
return fmt.Errorf("select series: %w", err)
}
series := h.series.getByID(chunks.HeadSeriesRef(p.At()))
@ -1495,8 +1497,8 @@ func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Match
if p.Err() != nil {
return p.Err()
}
if ctx.Err() != nil {
return errors.Wrap(err, "select series")
if err := ctx.Err(); err != nil {
return fmt.Errorf("select series: %w", err)
}
if h.wal != nil {

50
tsdb/head_append.go

@ -15,11 +15,11 @@ package tsdb
import (
"context"
"errors"
"fmt"
"math"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
@ -358,10 +358,10 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
}
if err != nil {
switch err {
case storage.ErrOutOfOrderSample:
switch {
case errors.Is(err, storage.ErrOutOfOrderSample):
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
case storage.ErrTooOldSample:
case errors.Is(err, storage.ErrTooOldSample):
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
}
return 0, err
@ -428,10 +428,10 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) {
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if lset.IsEmpty() {
return nil, errors.Wrap(ErrInvalidSample, "empty labelset")
return nil, fmt.Errorf("empty labelset: %w", ErrInvalidSample)
}
if l, dup := lset.HasDuplicateLabelNames(); dup {
return nil, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
return nil, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
}
var created bool
var err error
@ -557,7 +557,7 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels,
err := a.head.exemplars.ValidateExemplar(s.lset, e)
if err != nil {
if err == storage.ErrDuplicateExemplar || err == storage.ErrExemplarsDisabled {
if errors.Is(err, storage.ErrDuplicateExemplar) || errors.Is(err, storage.ErrExemplarsDisabled) {
// Duplicate, don't return an error but don't accept the exemplar.
return 0, nil
}
@ -596,11 +596,11 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if lset.IsEmpty() {
return 0, errors.Wrap(ErrInvalidSample, "empty labelset")
return 0, fmt.Errorf("empty labelset: %w", ErrInvalidSample)
}
if l, dup := lset.HasDuplicateLabelNames(); dup {
return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
}
var created bool
@ -628,7 +628,7 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
s.Lock()
if err := s.appendableHistogram(t, h); err != nil {
s.Unlock()
if err == storage.ErrOutOfOrderSample {
if errors.Is(err, storage.ErrOutOfOrderSample) {
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
}
return 0, err
@ -645,7 +645,7 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
s.Lock()
if err := s.appendableFloatHistogram(t, fh); err != nil {
s.Unlock()
if err == storage.ErrOutOfOrderSample {
if errors.Is(err, storage.ErrOutOfOrderSample) {
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
}
return 0, err
@ -729,7 +729,7 @@ func (a *headAppender) log() error {
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log series")
return fmt.Errorf("log series: %w", err)
}
}
if len(a.metadata) > 0 {
@ -737,7 +737,7 @@ func (a *headAppender) log() error {
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log metadata")
return fmt.Errorf("log metadata: %w", err)
}
}
if len(a.samples) > 0 {
@ -745,21 +745,21 @@ func (a *headAppender) log() error {
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log samples")
return fmt.Errorf("log samples: %w", err)
}
}
if len(a.histograms) > 0 {
rec = enc.HistogramSamples(a.histograms, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log histograms")
return fmt.Errorf("log histograms: %w", err)
}
}
if len(a.floatHistograms) > 0 {
rec = enc.FloatHistogramSamples(a.floatHistograms, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log float histograms")
return fmt.Errorf("log float histograms: %w", err)
}
}
// Exemplars should be logged after samples (float/native histogram/etc),
@ -771,7 +771,7 @@ func (a *headAppender) log() error {
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log exemplars")
return fmt.Errorf("log exemplars: %w", err)
}
}
return nil
@ -800,7 +800,7 @@ func (a *headAppender) Commit() (err error) {
if err := a.log(); err != nil {
_ = a.Rollback() // Most likely the same error will happen again.
return errors.Wrap(err, "write to WAL")
return fmt.Errorf("write to WAL: %w", err)
}
if a.head.writeNotified != nil {
@ -818,7 +818,7 @@ func (a *headAppender) Commit() (err error) {
}
// We don't instrument exemplar appends here, all is instrumented by storage.
if err := a.head.exemplars.AddExemplar(s.lset, e.exemplar); err != nil {
if err == storage.ErrOutOfOrderExemplar {
if errors.Is(err, storage.ErrOutOfOrderExemplar) {
continue
}
level.Debug(a.head.logger).Log("msg", "Unknown error while adding exemplar", "err", err)
@ -898,16 +898,16 @@ func (a *headAppender) Commit() (err error) {
series.Lock()
oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow)
switch err {
case nil:
switch {
case err == nil:
// Do nothing.
case storage.ErrOutOfOrderSample:
case errors.Is(err, storage.ErrOutOfOrderSample):
samplesAppended--
oooRejected++
case storage.ErrOutOfBounds:
case errors.Is(err, storage.ErrOutOfBounds):
samplesAppended--
oobRejected++
case storage.ErrTooOldSample:
case errors.Is(err, storage.ErrTooOldSample):
samplesAppended--
tooOldRejected++
default:
@ -1487,7 +1487,7 @@ func (s *memSeries) mmapChunks(chunkDiskMapper *chunks.ChunkDiskMapper) (count i
}
func handleChunkWriteError(err error) {
if err != nil && err != chunks.ErrChunkDiskMapperClosed {
if err != nil && !errors.Is(err, chunks.ErrChunkDiskMapperClosed) {
panic(err)
}
}

2
tsdb/head_bench_test.go

@ -14,10 +14,10 @@
package tsdb
import (
"errors"
"strconv"
"testing"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

15
tsdb/head_read.go

@ -15,11 +15,12 @@ package tsdb
import (
"context"
"errors"
"fmt"
"math"
"sync"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/labels"
@ -133,7 +134,7 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
}
}
if err := p.Err(); err != nil {
return index.ErrPostings(errors.Wrap(err, "expand postings"))
return index.ErrPostings(fmt.Errorf("expand postings: %w", err))
}
slices.SortFunc(series, func(a, b *memSeries) int {
@ -388,7 +389,8 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
if ix < len(s.mmappedChunks) {
chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref)
if err != nil {
if _, ok := err.(*chunks.CorruptionErr); ok {
var cerr *chunks.CorruptionErr
if errors.As(err, &cerr) {
panic(err)
}
return nil, false, false, err
@ -516,14 +518,15 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
xor, err = s.ooo.oooHeadChunk.chunk.ToXORBetweenTimestamps(meta.OOOLastMinTime, meta.OOOLastMaxTime)
}
if err != nil {
return nil, errors.Wrap(err, "failed to convert ooo head chunk to xor chunk")
return nil, fmt.Errorf("failed to convert ooo head chunk to xor chunk: %w", err)
}
iterable = xor
} else {
chk, err := cdm.Chunk(c.ref)
if err != nil {
if _, ok := err.(*chunks.CorruptionErr); ok {
return nil, errors.Wrap(err, "invalid ooo mmapped chunk")
var cerr *chunks.CorruptionErr
if errors.As(err, &cerr) {
return nil, fmt.Errorf("invalid ooo mmapped chunk: %w", err)
}
return nil, err
}

15
tsdb/head_test.go

@ -30,7 +30,6 @@ import (
"testing"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
@ -2056,9 +2055,8 @@ func TestWalRepair_DecodingError(t *testing.T) {
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
initErr := h.Init(math.MinInt64)
err = errors.Cause(initErr) // So that we can pick up errors even if wrapped.
_, corrErr := err.(*wlog.CorruptionErr)
require.True(t, corrErr, "reading the wal didn't return corruption error")
var cerr *wlog.CorruptionErr
require.ErrorAs(t, initErr, &cerr, "reading the wal didn't return corruption error")
require.NoError(t, h.Close()) // Head will close the wal as well.
}
@ -2129,12 +2127,11 @@ func TestWblRepair_DecodingError(t *testing.T) {
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
initErr := h.Init(math.MinInt64)
_, ok := initErr.(*errLoadWbl)
require.True(t, ok) // Wbl errors are wrapped into errLoadWbl, make sure we can unwrap it.
var elb *errLoadWbl
require.ErrorAs(t, initErr, &elb) // Wbl errors are wrapped into errLoadWbl, make sure we can unwrap it.
err = errors.Cause(initErr) // So that we can pick up errors even if wrapped.
_, corrErr := err.(*wlog.CorruptionErr)
require.True(t, corrErr, "reading the wal didn't return corruption error")
var cerr *wlog.CorruptionErr
require.ErrorAs(t, initErr, &cerr, "reading the wal didn't return corruption error")
require.NoError(t, h.Close()) // Head will close the wal as well.
}

103
tsdb/head_wal.go

@ -14,6 +14,7 @@
package tsdb
import (
"errors"
"fmt"
"math"
"os"
@ -24,7 +25,6 @@ import (
"time"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/model/exemplar"
@ -128,7 +128,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
// At the moment the only possible error here is out of order exemplars, which we shouldn't see when
// replaying the WAL, so lets just log the error if it's not that type.
err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels})
if err != nil && err == storage.ErrOutOfOrderExemplar {
if err != nil && errors.Is(err, storage.ErrOutOfOrderExemplar) {
level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err)
}
}
@ -145,7 +145,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
series, err = dec.Series(rec, series)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode series"),
Err: fmt.Errorf("decode series: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
@ -157,7 +157,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
samples, err = dec.Samples(rec, samples)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode samples"),
Err: fmt.Errorf("decode samples: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
@ -169,7 +169,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
tstones, err = dec.Tombstones(rec, tstones)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode tombstones"),
Err: fmt.Errorf("decode tombstones: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
@ -181,7 +181,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
exemplars, err = dec.Exemplars(rec, exemplars)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode exemplars"),
Err: fmt.Errorf("decode exemplars: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
@ -193,7 +193,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
hists, err = dec.HistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode histograms"),
Err: fmt.Errorf("decode histograms: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
@ -205,7 +205,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
hists, err = dec.FloatHistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode float histograms"),
Err: fmt.Errorf("decode float histograms: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
@ -217,7 +217,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
meta, err := dec.Metadata(rec, meta)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode metadata"),
Err: fmt.Errorf("decode metadata: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
@ -416,8 +416,8 @@ Outer:
close(exemplarsInput)
wg.Wait()
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
if err := r.Err(); err != nil {
return fmt.Errorf("read records: %w", err)
}
if unknownRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load() > 0 {
@ -708,7 +708,7 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
samples, err = dec.Samples(rec, samples)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode samples"),
Err: fmt.Errorf("decode samples: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
@ -720,7 +720,7 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
markers, err = dec.MmapMarkers(rec, markers)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode mmap markers"),
Err: fmt.Errorf("decode mmap markers: %w", err),
Segment: r.Segment(),
Offset: r.Offset(),
}
@ -806,8 +806,8 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
}
wg.Wait()
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
if err := r.Err(); err != nil {
return fmt.Errorf("read records: %w", err)
}
if unknownRefs.Load() > 0 || mmapMarkerUnknownRefs.Load() > 0 {
@ -995,7 +995,7 @@ func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapsh
chk, err := chunkenc.FromData(enc, chunkBytesCopy)
if err != nil {
return csr, errors.Wrap(err, "chunk from data")
return csr, fmt.Errorf("chunk from data: %w", err)
}
csr.mc.chunk = chk
@ -1030,7 +1030,7 @@ func encodeTombstonesToSnapshotRecord(tr tombstones.Reader) ([]byte, error) {
buf.PutByte(chunkSnapshotRecordTypeTombstones)
b, err := tombstones.Encode(tr)
if err != nil {
return nil, errors.Wrap(err, "encode tombstones")
return nil, fmt.Errorf("encode tombstones: %w", err)
}
buf.PutUvarintBytes(b)
@ -1045,7 +1045,10 @@ func decodeTombstonesSnapshotRecord(b []byte) (tombstones.Reader, error) {
}
tr, err := tombstones.Decode(dec.UvarintBytes())
return tr, errors.Wrap(err, "decode tombstones")
if err != nil {
return tr, fmt.Errorf("decode tombstones: %w", err)
}
return tr, nil
}
const chunkSnapshotPrefix = "chunk_snapshot."
@ -1072,13 +1075,13 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
stats := &ChunkSnapshotStats{}
wlast, woffset, err := h.wal.LastSegmentAndOffset()
if err != nil && err != record.ErrNotFound {
return stats, errors.Wrap(err, "get last wal segment and offset")
if err != nil && !errors.Is(err, record.ErrNotFound) {
return stats, fmt.Errorf("get last wal segment and offset: %w", err)
}
_, cslast, csoffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
if err != nil && err != record.ErrNotFound {
return stats, errors.Wrap(err, "find last chunk snapshot")
if err != nil && !errors.Is(err, record.ErrNotFound) {
return stats, fmt.Errorf("find last chunk snapshot: %w", err)
}
if wlast == cslast && woffset == csoffset {
@ -1093,11 +1096,11 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
stats.Dir = cpdir
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
return stats, errors.Wrap(err, "create chunk snapshot dir")
return stats, fmt.Errorf("create chunk snapshot dir: %w", err)
}
cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionType())
if err != nil {
return stats, errors.Wrap(err, "open chunk snapshot")
return stats, fmt.Errorf("open chunk snapshot: %w", err)
}
// Ensures that an early return caused by an error doesn't leave any tmp files.
@ -1126,7 +1129,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
if len(buf) > 10*1024*1024 {
if err := cp.Log(recs...); err != nil {
h.series.locks[i].RUnlock()
return stats, errors.Wrap(err, "flush records")
return stats, fmt.Errorf("flush records: %w", err)
}
buf, recs = buf[:0], recs[:0]
}
@ -1139,16 +1142,16 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
// Add tombstones to the snapshot.
tombstonesReader, err := h.Tombstones()
if err != nil {
return stats, errors.Wrap(err, "get tombstones")
return stats, fmt.Errorf("get tombstones: %w", err)
}
rec, err := encodeTombstonesToSnapshotRecord(tombstonesReader)
if err != nil {
return stats, errors.Wrap(err, "encode tombstones")
return stats, fmt.Errorf("encode tombstones: %w", err)
}
recs = append(recs, rec)
// Flush remaining series records and tombstones.
if err := cp.Log(recs...); err != nil {
return stats, errors.Wrap(err, "flush records")
return stats, fmt.Errorf("flush records: %w", err)
}
buf = buf[:0]
@ -1167,7 +1170,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
encbuf.PutByte(chunkSnapshotRecordTypeExemplars)
enc.EncodeExemplarsIntoBuffer(batch, &encbuf)
if err := cp.Log(encbuf.Get()); err != nil {
return errors.Wrap(err, "log exemplars")
return fmt.Errorf("log exemplars: %w", err)
}
buf, batch = buf[:0], batch[:0]
return nil
@ -1175,7 +1178,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
err = h.exemplars.IterateExemplars(func(seriesLabels labels.Labels, e exemplar.Exemplar) error {
if len(batch) >= maxExemplarsPerRecord {
if err := flushExemplars(); err != nil {
return errors.Wrap(err, "flush exemplars")
return fmt.Errorf("flush exemplars: %w", err)
}
}
@ -1193,19 +1196,19 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
return nil
})
if err != nil {
return stats, errors.Wrap(err, "iterate exemplars")
return stats, fmt.Errorf("iterate exemplars: %w", err)
}
// Flush remaining exemplars.
if err := flushExemplars(); err != nil {
return stats, errors.Wrap(err, "flush exemplars at the end")
return stats, fmt.Errorf("flush exemplars at the end: %w", err)
}
if err := cp.Close(); err != nil {
return stats, errors.Wrap(err, "close chunk snapshot")
return stats, fmt.Errorf("close chunk snapshot: %w", err)
}
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return stats, errors.Wrap(err, "rename chunk snapshot directory")
return stats, fmt.Errorf("rename chunk snapshot directory: %w", err)
}
if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, wlast, woffset); err != nil {
@ -1229,7 +1232,10 @@ func (h *Head) performChunkSnapshot() error {
if err == nil {
level.Info(h.logger).Log("msg", "chunk snapshot complete", "duration", elapsed.String(), "num_series", stats.TotalSeries, "dir", stats.Dir)
}
return errors.Wrap(err, "chunk snapshot")
if err != nil {
return fmt.Errorf("chunk snapshot: %w", err)
}
return nil
}
// ChunkSnapshotStats returns stats about a created chunk snapshot.
@ -1327,16 +1333,16 @@ func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error {
func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSeries, error) {
dir, snapIdx, snapOffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
if err != nil {
if err == record.ErrNotFound {
if errors.Is(err, record.ErrNotFound) {
return snapIdx, snapOffset, nil, nil
}
return snapIdx, snapOffset, nil, errors.Wrap(err, "find last chunk snapshot")
return snapIdx, snapOffset, nil, fmt.Errorf("find last chunk snapshot: %w", err)
}
start := time.Now()
sr, err := wlog.NewSegmentsReader(dir)
if err != nil {
return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot")
return snapIdx, snapOffset, nil, fmt.Errorf("open chunk snapshot: %w", err)
}
defer func() {
if err := sr.Close(); err != nil {
@ -1424,7 +1430,7 @@ Outer:
numSeries++
csr, err := decodeSeriesFromChunkSnapshot(&dec, rec)
if err != nil {
loopErr = errors.Wrap(err, "decode series record")
loopErr = fmt.Errorf("decode series record: %w", err)
break Outer
}
recordChan <- csr
@ -1432,7 +1438,7 @@ Outer:
case chunkSnapshotRecordTypeTombstones:
tr, err := decodeTombstonesSnapshotRecord(rec)
if err != nil {
loopErr = errors.Wrap(err, "decode tombstones")
loopErr = fmt.Errorf("decode tombstones: %w", err)
break Outer
}
@ -1440,7 +1446,7 @@ Outer:
h.tombstones.AddInterval(ref, ivs...)
return nil
}); err != nil {
loopErr = errors.Wrap(err, "iterate tombstones")
loopErr = fmt.Errorf("iterate tombstones: %w", err)
break Outer
}
@ -1468,7 +1474,7 @@ Outer:
exemplarBuf = exemplarBuf[:0]
exemplarBuf, err = dec.ExemplarsFromBuffer(&decbuf, exemplarBuf)
if err != nil {
loopErr = errors.Wrap(err, "exemplars from buffer")
loopErr = fmt.Errorf("exemplars from buffer: %w", err)
break Outer
}
@ -1484,7 +1490,7 @@ Outer:
Value: e.V,
Ts: e.T,
}); err != nil {
loopErr = errors.Wrap(err, "add exemplar")
loopErr = fmt.Errorf("add exemplar: %w", err)
break Outer
}
}
@ -1502,16 +1508,19 @@ Outer:
}
close(errChan)
merr := tsdb_errors.NewMulti(errors.Wrap(loopErr, "decode loop"))
merr := tsdb_errors.NewMulti()
if loopErr != nil {
merr.Add(fmt.Errorf("decode loop: %w", loopErr))
}
for err := range errChan {
merr.Add(errors.Wrap(err, "record processing"))
merr.Add(fmt.Errorf("record processing: %w", err))
}
if err := merr.Err(); err != nil {
return -1, -1, nil, err
}
if r.Err() != nil {
return -1, -1, nil, errors.Wrap(r.Err(), "read records")
if err := r.Err(); err != nil {
return -1, -1, nil, fmt.Errorf("read records: %w", err)
}
if len(refSeries) == 0 {

6
tsdb/index/index.go

@ -425,7 +425,7 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...
// We add padding to 16 bytes to increase the addressable space we get through 4 byte
// series references.
if err := w.addPadding(16); err != nil {
return fmt.Errorf("failed to write padding bytes: %v", err)
return fmt.Errorf("failed to write padding bytes: %w", err)
}
if w.f.pos%16 != 0 {
@ -442,7 +442,7 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...
if !ok {
nameIndex, err = w.symbols.ReverseLookup(l.Name)
if err != nil {
return fmt.Errorf("symbol entry for %q does not exist, %v", l.Name, err)
return fmt.Errorf("symbol entry for %q does not exist, %w", l.Name, err)
}
}
w.labelNames[l.Name]++
@ -452,7 +452,7 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...
if !ok || cacheEntry.lastValue != l.Value {
valueIndex, err = w.symbols.ReverseLookup(l.Value)
if err != nil {
return fmt.Errorf("symbol entry for %q does not exist, %v", l.Value, err)
return fmt.Errorf("symbol entry for %q does not exist, %w", l.Value, err)
}
w.symbolCache[l.Name] = symbolCacheEntry{
index: nameIndex,

4
tsdb/mocks_test.go

@ -14,7 +14,7 @@
package tsdb
import (
"github.com/pkg/errors"
"fmt"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
@ -41,7 +41,7 @@ func (m *mockIndexWriter) AddSeries(_ storage.SeriesRef, l labels.Labels, chks .
for i, chk := range chks {
c, err := copyChunk(chk.Chunk)
if err != nil {
return errors.Wrap(err, "mockIndexWriter: copy chunk")
return fmt.Errorf("mockIndexWriter: copy chunk: %w", err)
}
chksNew[i] = chunks.Meta{MaxTime: chk.MaxTime, MinTime: chk.MinTime, Chunk: c}
}

40
tsdb/querier.go

@ -15,13 +15,13 @@ package tsdb
import (
"context"
"errors"
"fmt"
"math"
"strings"
"unicode/utf8"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/histogram"
@ -63,18 +63,18 @@ type blockBaseQuerier struct {
func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, error) {
indexr, err := b.Index()
if err != nil {
return nil, errors.Wrap(err, "open index reader")
return nil, fmt.Errorf("open index reader: %w", err)
}
chunkr, err := b.Chunks()
if err != nil {
indexr.Close()
return nil, errors.Wrap(err, "open chunk reader")
return nil, fmt.Errorf("open chunk reader: %w", err)
}
tombsr, err := b.Tombstones()
if err != nil {
indexr.Close()
chunkr.Close()
return nil, errors.Wrap(err, "open tombstone reader")
return nil, fmt.Errorf("open tombstone reader: %w", err)
}
if tombsr == nil {
@ -442,12 +442,12 @@ func inversePostingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Ma
func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
p, err := PostingsForMatchers(ctx, r, matchers...)
if err != nil {
return nil, errors.Wrap(err, "fetching postings for matchers")
return nil, fmt.Errorf("fetching postings for matchers: %w", err)
}
allValues, err := r.LabelValues(ctx, name)
if err != nil {
return nil, errors.Wrapf(err, "fetching values of label %s", name)
return nil, fmt.Errorf("fetching values of label %s: %w", name, err)
}
// If we have a matcher for the label name, we can filter out values that don't match
@ -473,12 +473,12 @@ func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, ma
for i, value := range allValues {
valuesPostings[i], err = r.Postings(ctx, name, value)
if err != nil {
return nil, errors.Wrapf(err, "fetching postings for %s=%q", name, value)
return nil, fmt.Errorf("fetching postings for %s=%q: %w", name, value, err)
}
}
indexes, err := index.FindIntersectingPostings(p, valuesPostings)
if err != nil {
return nil, errors.Wrap(err, "intersecting postings")
return nil, fmt.Errorf("intersecting postings: %w", err)
}
values := make([]string, 0, len(indexes))
@ -499,8 +499,8 @@ func labelNamesWithMatchers(ctx context.Context, r IndexReader, matchers ...*lab
for p.Next() {
postings = append(postings, p.At())
}
if p.Err() != nil {
return nil, errors.Wrapf(p.Err(), "postings for label names with matchers")
if err := p.Err(); err != nil {
return nil, fmt.Errorf("postings for label names with matchers: %w", err)
}
return r.LabelNamesFor(ctx, postings...)
@ -539,10 +539,10 @@ func (b *blockBaseSeriesSet) Next() bool {
for b.p.Next() {
if err := b.index.Series(b.p.At(), &b.builder, &b.bufChks); err != nil {
// Postings may be stale. Skip if no underlying series exists.
if errors.Cause(err) == storage.ErrNotFound {
if errors.Is(err, storage.ErrNotFound) {
continue
}
b.err = errors.Wrapf(err, "get series %d", b.p.At())
b.err = fmt.Errorf("get series %d: %w", b.p.At(), err)
return false
}
@ -552,7 +552,7 @@ func (b *blockBaseSeriesSet) Next() bool {
intervals, err := b.tombstones.Get(b.p.At())
if err != nil {
b.err = errors.Wrap(err, "get tombstones")
b.err = fmt.Errorf("get tombstones: %w", err)
return false
}
@ -702,7 +702,7 @@ func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool {
}
if p.err != nil {
p.err = errors.Wrapf(p.err, "cannot populate chunk %d from block %s", p.currMeta.Ref, p.blockID.String())
p.err = fmt.Errorf("cannot populate chunk %d from block %s: %w", p.currMeta.Ref, p.blockID.String(), p.err)
return false
}
@ -900,7 +900,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
valueType := p.currDelIter.Next()
if valueType == chunkenc.ValNone {
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
p.err = fmt.Errorf("iterate chunk while re-encoding: %w", err)
}
return false
}
@ -968,11 +968,11 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
}
if err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
p.err = fmt.Errorf("iterate chunk while re-encoding: %w", err)
return false
}
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
p.err = fmt.Errorf("iterate chunk while re-encoding: %w", err)
return false
}
@ -991,7 +991,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
firstValueType := p.currDelIter.Next()
if firstValueType == chunkenc.ValNone {
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "populateChunksFromIterable: no samples could be read")
p.err = fmt.Errorf("populateChunksFromIterable: no samples could be read: %w", err)
return false
}
return false
@ -1075,11 +1075,11 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
}
if err != nil {
p.err = errors.Wrap(err, "populateChunksFromIterable: error when writing new chunks")
p.err = fmt.Errorf("populateChunksFromIterable: error when writing new chunks: %w", err)
return false
}
if err = p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "populateChunksFromIterable: currDelIter error when writing new chunks")
p.err = fmt.Errorf("populateChunksFromIterable: currDelIter error when writing new chunks: %w", err)
return false
}

4
tsdb/querier_test.go

@ -15,6 +15,7 @@ package tsdb
import (
"context"
"errors"
"fmt"
"math"
"math/rand"
@ -26,7 +27,6 @@ import (
"time"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/histogram"
@ -2317,7 +2317,7 @@ func (m mockIndex) Postings(ctx context.Context, name string, values ...string)
func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
ep, err := index.ExpandPostings(p)
if err != nil {
return index.ErrPostings(errors.Wrap(err, "expand postings"))
return index.ErrPostings(fmt.Errorf("expand postings: %w", err))
}
sort.Slice(ep, func(i, j int) bool {

43
tsdb/repair.go

@ -22,7 +22,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
@ -35,7 +34,7 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
// We must actually set the index file version to 2 and revert the meta.json version back to 1.
dirs, err := blockDirs(dir)
if err != nil {
return errors.Wrapf(err, "list block dirs in %q", dir)
return fmt.Errorf("list block dirs in %q: %w", dir, err)
}
tmpFiles := make([]string, 0, len(dirs))
@ -71,44 +70,54 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
repl, err := os.Create(filepath.Join(d, "index.repaired"))
if err != nil {
return errors.Wrapf(err, "create index.repaired for block dir: %v", d)
return fmt.Errorf("create index.repaired for block dir: %v: %w", d, err)
}
tmpFiles = append(tmpFiles, repl.Name())
broken, err := os.Open(filepath.Join(d, indexFilename))
if err != nil {
return errors.Wrapf(err, "open broken index for block dir: %v", d)
return fmt.Errorf("open broken index for block dir: %v: %w", d, err)
}
if _, err := io.Copy(repl, broken); err != nil {
return errors.Wrapf(err, "copy content of index to index.repaired for block dir: %v", d)
return fmt.Errorf("copy content of index to index.repaired for block dir: %v: %w", d, err)
}
// Set the 5th byte to 2 to indicate the correct file format version.
if _, err := repl.WriteAt([]byte{2}, 4); err != nil {
return tsdb_errors.NewMulti(
errors.Wrapf(err, "rewrite of index.repaired for block dir: %v", d),
errors.Wrap(repl.Close(), "close"),
).Err()
errs := tsdb_errors.NewMulti(
fmt.Errorf("rewrite of index.repaired for block dir: %v: %w", d, err))
if err := repl.Close(); err != nil {
errs.Add(fmt.Errorf("close: %w", err))
}
return errs.Err()
}
if err := repl.Sync(); err != nil {
return tsdb_errors.NewMulti(
errors.Wrapf(err, "sync of index.repaired for block dir: %v", d),
errors.Wrap(repl.Close(), "close"),
).Err()
errs := tsdb_errors.NewMulti(
fmt.Errorf("sync of index.repaired for block dir: %v: %w", d, err))
if err := repl.Close(); err != nil {
errs.Add(fmt.Errorf("close: %w", err))
}
return errs.Err()
}
if err := repl.Close(); err != nil {
return errors.Wrapf(repl.Close(), "close repaired index for block dir: %v", d)
return fmt.Errorf("close repaired index for block dir: %v: %w", d, err)
}
if err := broken.Close(); err != nil {
return errors.Wrapf(repl.Close(), "close broken index for block dir: %v", d)
if err := repl.Close(); err != nil {
return fmt.Errorf("close broken index for block dir: %v: %w", d, err)
}
}
if err := fileutil.Replace(repl.Name(), broken.Name()); err != nil {
return errors.Wrapf(repl.Close(), "replaced broken index with index.repaired for block dir: %v", d)
if err := repl.Close(); err != nil {
return fmt.Errorf("replaced broken index with index.repaired for block dir: %v: %w", d, err)
}
}
// Reset version of meta.json to 1.
meta.Version = metaVersion1
if _, err := writeMetaFile(logger, d, meta); err != nil {
return errors.Wrapf(repl.Close(), "write meta for block dir: %v", d)
if err := repl.Close(); err != nil {
return fmt.Errorf("write meta for block dir: %v: %w", d, err)
}
}
}
return nil

101
tsdb/wal.go

@ -16,6 +16,7 @@ package tsdb
import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"hash"
"hash/crc32"
@ -28,7 +29,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/storage"
@ -210,7 +210,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration,
for _, fn := range fns[i:] {
if err := os.Remove(fn); err != nil {
return w, errors.Wrap(err, "removing segment failed")
return w, fmt.Errorf("removing segment failed: %w", err)
}
}
break
@ -237,8 +237,8 @@ func (r *repairingWALReader) Read(
if err == nil {
return nil
}
cerr, ok := errors.Cause(err).(walCorruptionErr)
if !ok {
var cerr *walCorruptionErr
if !errors.As(err, &cerr) {
return err
}
r.wal.metrics.corruptions.Inc()
@ -309,7 +309,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(chunks.HeadSeriesRef) bool)
// Past WAL files are closed. We have to reopen them for another read.
f, err := w.openSegmentFile(sf.Name())
if err != nil {
return errors.Wrap(err, "open old WAL segment for read")
return fmt.Errorf("open old WAL segment for read: %w", err)
}
candidates = append(candidates, &segmentFile{
File: f,
@ -326,7 +326,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(chunks.HeadSeriesRef) bool)
// Create a new tmp file.
f, err := w.createSegmentFile(filepath.Join(w.dirFile.Name(), "compact.tmp"))
if err != nil {
return errors.Wrap(err, "create compaction segment")
return fmt.Errorf("create compaction segment: %w", err)
}
defer func() {
if err := os.RemoveAll(f.Name()); err != nil {
@ -352,7 +352,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(chunks.HeadSeriesRef) bool)
err := r.decodeSeries(flag, byt, &decSeries)
if err != nil {
return errors.Wrap(err, "decode samples while truncating")
return fmt.Errorf("decode samples while truncating: %w", err)
}
for _, s := range decSeries {
if keep(s.Ref) {
@ -367,11 +367,11 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(chunks.HeadSeriesRef) bool)
w.putBuffer(buf)
if err != nil {
return errors.Wrap(err, "write to compaction segment")
return fmt.Errorf("write to compaction segment: %w", err)
}
}
if r.Err() != nil {
return errors.Wrap(r.Err(), "read candidate WAL files")
if err := r.Err(); err != nil {
return fmt.Errorf("read candidate WAL files: %w", err)
}
off, err := csf.Seek(0, io.SeekCurrent)
@ -390,12 +390,12 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(chunks.HeadSeriesRef) bool)
_ = candidates[0].Close() // need close before remove on platform windows
if err := fileutil.Replace(csf.Name(), candidates[0].Name()); err != nil {
return errors.Wrap(err, "rename compaction segment")
return fmt.Errorf("rename compaction segment: %w", err)
}
for _, f := range candidates[1:] {
f.Close() // need close before remove on platform windows
if err := os.RemoveAll(f.Name()); err != nil {
return errors.Wrap(err, "delete WAL segment file")
return fmt.Errorf("delete WAL segment file: %w", err)
}
}
if err := w.dirFile.Sync(); err != nil {
@ -435,7 +435,7 @@ func (w *SegmentWAL) LogSeries(series []record.RefSeries) error {
w.putBuffer(buf)
if err != nil {
return errors.Wrap(err, "log series")
return fmt.Errorf("log series: %w", err)
}
tf := w.head()
@ -462,7 +462,7 @@ func (w *SegmentWAL) LogSamples(samples []record.RefSample) error {
w.putBuffer(buf)
if err != nil {
return errors.Wrap(err, "log series")
return fmt.Errorf("log series: %w", err)
}
tf := w.head()
@ -488,7 +488,7 @@ func (w *SegmentWAL) LogDeletes(stones []tombstones.Stone) error {
w.putBuffer(buf)
if err != nil {
return errors.Wrap(err, "log series")
return fmt.Errorf("log series: %w", err)
}
tf := w.head()
@ -523,7 +523,7 @@ func (w *SegmentWAL) openSegmentFile(name string) (*os.File, error) {
switch n, err := f.Read(metab); {
case err != nil:
return nil, errors.Wrapf(err, "validate meta %q", f.Name())
return nil, fmt.Errorf("validate meta %q: %w", f.Name(), err)
case n != 8:
return nil, fmt.Errorf("invalid header size %d in %q", n, f.Name())
}
@ -573,16 +573,16 @@ func (w *SegmentWAL) cut() error {
w.actorc <- func() error {
off, err := hf.Seek(0, io.SeekCurrent)
if err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
return fmt.Errorf("finish old segment %s: %w", hf.Name(), err)
}
if err := hf.Truncate(off); err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
return fmt.Errorf("finish old segment %s: %w", hf.Name(), err)
}
if err := hf.Sync(); err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
return fmt.Errorf("finish old segment %s: %w", hf.Name(), err)
}
if err := hf.Close(); err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
return fmt.Errorf("finish old segment %s: %w", hf.Name(), err)
}
return nil
}
@ -600,7 +600,10 @@ func (w *SegmentWAL) cut() error {
go func() {
w.actorc <- func() error {
return errors.Wrap(w.dirFile.Sync(), "sync WAL directory")
if err := w.dirFile.Sync(); err != nil {
return fmt.Errorf("sync WAL directory: %w", err)
}
return nil
}
}()
@ -635,7 +638,7 @@ func (w *SegmentWAL) Sync() error {
head = w.head()
}()
if err != nil {
return errors.Wrap(err, "flush buffer")
return fmt.Errorf("flush buffer: %w", err)
}
if head != nil {
// But only fsync the head segment after releasing the mutex as it will block on disk I/O.
@ -726,11 +729,13 @@ func (w *SegmentWAL) Close() error {
// only the current segment will still be open.
if hf := w.head(); hf != nil {
if err := hf.Close(); err != nil {
return errors.Wrapf(err, "closing WAL head %s", hf.Name())
return fmt.Errorf("closing WAL head %s: %w", hf.Name(), err)
}
}
return errors.Wrapf(w.dirFile.Close(), "closing WAL dir %s", w.dirFile.Name())
if err := w.dirFile.Close(); err != nil {
return fmt.Errorf("closing WAL dir %s: %w", w.dirFile.Name(), err)
}
return nil
}
func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error {
@ -921,7 +926,7 @@ func (r *walReader) Read(
err = r.decodeSeries(flag, b, &series)
if err != nil {
err = errors.Wrap(err, "decode series entry")
err = fmt.Errorf("decode series entry: %w", err)
break
}
datac <- series
@ -940,7 +945,7 @@ func (r *walReader) Read(
err = r.decodeSamples(flag, b, &samples)
if err != nil {
err = errors.Wrap(err, "decode samples entry")
err = fmt.Errorf("decode samples entry: %w", err)
break
}
datac <- samples
@ -960,7 +965,7 @@ func (r *walReader) Read(
err = r.decodeDeletes(flag, b, &deletes)
if err != nil {
err = errors.Wrap(err, "decode delete entry")
err = fmt.Errorf("decode delete entry: %w", err)
break
}
datac <- deletes
@ -982,8 +987,8 @@ func (r *walReader) Read(
if err != nil {
return err
}
if r.Err() != nil {
return errors.Wrap(r.Err(), "read entry")
if err := r.Err(); err != nil {
return fmt.Errorf("read entry: %w", err)
}
return nil
}
@ -1046,12 +1051,16 @@ type walCorruptionErr struct {
lastOffset int64
}
func (e walCorruptionErr) Error() string {
func (e *walCorruptionErr) Error() string {
return fmt.Sprintf("%s <file: %d, lastOffset: %d>", e.err, e.file, e.lastOffset)
}
func (e *walCorruptionErr) Unwrap() error {
return e.err
}
func (r *walReader) corruptionErr(s string, args ...interface{}) error {
return walCorruptionErr{
return &walCorruptionErr{
err: fmt.Errorf(s, args...),
file: r.cur,
lastOffset: r.lastOffset,
@ -1152,8 +1161,8 @@ func (r *walReader) decodeSamples(flag byte, b []byte, res *[]record.RefSample)
})
}
if dec.Err() != nil {
return errors.Wrapf(dec.Err(), "decode error after %d samples", len(*res))
if err := dec.Err(); err != nil {
return fmt.Errorf("decode error after %d samples: %w", len(*res), err)
}
if len(dec.B) > 0 {
return fmt.Errorf("unexpected %d bytes left in entry", len(dec.B))
@ -1185,7 +1194,7 @@ func deprecatedWALExists(logger log.Logger, dir string) (bool, error) {
// Detect whether we still have the old WAL.
fns, err := sequenceFiles(dir)
if err != nil && !os.IsNotExist(err) {
return false, errors.Wrap(err, "list sequence files")
return false, fmt.Errorf("list sequence files: %w", err)
}
if len(fns) == 0 {
return false, nil // No WAL at all yet.
@ -1194,13 +1203,13 @@ func deprecatedWALExists(logger log.Logger, dir string) (bool, error) {
// old WAL.
f, err := os.Open(fns[0])
if err != nil {
return false, errors.Wrap(err, "check first existing segment")
return false, fmt.Errorf("check first existing segment: %w", err)
}
defer f.Close()
var hdr [4]byte
if _, err := f.Read(hdr[:]); err != nil && err != io.EOF {
return false, errors.Wrap(err, "read header from first segment")
if _, err := f.Read(hdr[:]); err != nil && !errors.Is(err, io.EOF) {
return false, fmt.Errorf("read header from first segment: %w", err)
}
// If we cannot read the magic header for segments of the old WAL, abort.
// Either it's migrated already or there's a corruption issue with which
@ -1223,11 +1232,11 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
tmpdir := dir + ".tmp"
if err := os.RemoveAll(tmpdir); err != nil {
return errors.Wrap(err, "cleanup replacement dir")
return fmt.Errorf("cleanup replacement dir: %w", err)
}
repl, err := wlog.New(logger, nil, tmpdir, wlog.CompressionNone)
if err != nil {
return errors.Wrap(err, "open new WAL")
return fmt.Errorf("open new WAL: %w", err)
}
// It should've already been closed as part of the previous finalization.
@ -1240,7 +1249,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
w, err := OpenSegmentWAL(dir, logger, time.Minute, nil)
if err != nil {
return errors.Wrap(err, "open old WAL")
return fmt.Errorf("open old WAL: %w", err)
}
defer w.Close()
@ -1271,22 +1280,22 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
},
)
if decErr != nil {
return errors.Wrap(err, "decode old entries")
return fmt.Errorf("decode old entries: %w", err)
}
if err != nil {
return errors.Wrap(err, "write new entries")
return fmt.Errorf("write new entries: %w", err)
}
// We explicitly close even when there is a defer for Windows to be
// able to delete it. The defer is in place to close it in-case there
// are errors above.
if err := w.Close(); err != nil {
return errors.Wrap(err, "close old WAL")
return fmt.Errorf("close old WAL: %w", err)
}
if err := repl.Close(); err != nil {
return errors.Wrap(err, "close new WAL")
return fmt.Errorf("close new WAL: %w", err)
}
if err := fileutil.Replace(tmpdir, dir); err != nil {
return errors.Wrap(err, "replace old WAL")
return fmt.Errorf("replace old WAL: %w", err)
}
return nil
}

4
tsdb/wlog/wlog.go

@ -116,6 +116,10 @@ func (e *CorruptionErr) Error() string {
return fmt.Sprintf("corruption in segment %s at %d: %s", SegmentName(e.Dir, e.Segment), e.Offset, e.Err)
}
func (e *CorruptionErr) Unwrap() error {
return e.Err
}
// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends.
func OpenWriteSegment(logger log.Logger, dir string, k int) (*Segment, error) {
segName := SegmentName(dir, k)

Loading…
Cancel
Save