Browse Source

Fix queries after a failed snapshot replay (#9980)

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
pull/9981/head
Ganesh Vernekar 3 years ago committed by GitHub
parent
commit
05d4d97bcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      tsdb/head.go
  2. 104
      tsdb/head_test.go
  3. 3
      tsdb/head_wal.go

4
tsdb/head.go

@ -473,7 +473,9 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
// limits the ingested samples to the head min valid time.
func (h *Head) Init(minValidTime int64) error {
h.minValidTime.Store(minValidTime)
defer h.postings.EnsureOrder()
defer func() {
h.postings.EnsureOrder()
}()
defer h.gc() // After loading the wal remove the obsolete data from the head.
defer func() {
// Loading of m-mapped chunks and snapshot can make the mint of the Head

104
tsdb/head_test.go

@ -2847,3 +2847,107 @@ func TestSnapshotError(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(tm))
}
// Tests https://github.com/prometheus/prometheus/issues/9725.
func TestChunkSnapshotReplayBug(t *testing.T) {
dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
require.NoError(t, err)
// Write few series records and samples such that the series references are not in order in the WAL
// for status_code="200".
var buf []byte
for i := 1; i <= 1000; i++ {
var ref chunks.HeadSeriesRef
if i <= 500 {
ref = chunks.HeadSeriesRef(i * 100)
} else {
ref = chunks.HeadSeriesRef((i - 500) * 50)
}
seriesRec := record.RefSeries{
Ref: ref,
Labels: labels.Labels{
{Name: "__name__", Value: "request_duration"},
{Name: "status_code", Value: "200"},
{Name: "foo", Value: fmt.Sprintf("baz%d", rand.Int())},
},
}
// Add a sample so that the series is not garbage collected.
samplesRec := record.RefSample{Ref: ref, T: 1000, V: 1000}
var enc record.Encoder
rec := enc.Series([]record.RefSeries{seriesRec}, buf)
buf = rec[:0]
require.NoError(t, wlog.Log(rec))
rec = enc.Samples([]record.RefSample{samplesRec}, buf)
buf = rec[:0]
require.NoError(t, wlog.Log(rec))
}
// Write a corrupt snapshot to fail the replay on startup.
snapshotName := chunkSnapshotDir(0, 100)
cpdir := filepath.Join(dir, snapshotName)
require.NoError(t, os.MkdirAll(cpdir, 0o777))
err = ioutil.WriteFile(filepath.Join(cpdir, "00000000"), []byte{1, 5, 3, 5, 6, 7, 4, 2, 2}, 0o777)
require.NoError(t, err)
opts := DefaultHeadOptions()
opts.ChunkDirRoot = dir
opts.EnableMemorySnapshotOnShutdown = true
head, err := NewHead(nil, nil, wlog, opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))
defer func() {
require.NoError(t, head.Close())
}()
// Snapshot replay should error out.
require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal))
// Querying `request_duration{status_code!="200"}` should return no series since all of
// them have status_code="200".
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
series := query(t, q,
labels.MustNewMatcher(labels.MatchEqual, "__name__", "request_duration"),
labels.MustNewMatcher(labels.MatchNotEqual, "status_code", "200"),
)
require.Len(t, series, 0, "there should be no series found")
}
func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
require.NoError(t, err)
// Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots.
snapshotName := chunkSnapshotDir(0, 100) + ".tmp"
cpdir := filepath.Join(dir, snapshotName)
require.NoError(t, os.MkdirAll(cpdir, 0o777))
opts := DefaultHeadOptions()
opts.ChunkDirRoot = dir
opts.EnableMemorySnapshotOnShutdown = true
head, err := NewHead(nil, nil, wlog, opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))
require.Equal(t, 0.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal))
// Add some samples for the snapshot.
app := head.Appender(context.Background())
_, err = app.Append(0, labels.Labels{{Name: "foo", Value: "bar"}}, 10, 10)
require.NoError(t, err)
require.NoError(t, app.Commit())
// Should not return any error for a successful snapshot.
require.NoError(t, head.Close())
// Verify the snapshot.
name, idx, offset, err := LastChunkSnapshot(dir)
require.NoError(t, err)
require.True(t, name != "")
require.Equal(t, 0, idx)
require.Greater(t, offset, 0)
}

3
tsdb/head_wal.go

@ -777,7 +777,8 @@ func LastChunkSnapshot(dir string) (string, int, int, error) {
splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".")
if len(splits) != 2 {
return "", 0, 0, errors.Errorf("chunk snapshot %s is not in the right format", fi.Name())
// Chunk snapshots is not in the right format, we do not care about it.
continue
}
idx, err := strconv.Atoi(splits[0])

Loading…
Cancel
Save