|
|
|
@ -2827,3 +2827,107 @@ func TestSnapshotError(t *testing.T) {
|
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.Equal(t, 0, len(tm)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Tests https://github.com/prometheus/prometheus/issues/9725.
|
|
|
|
|
func TestChunkSnapshotReplayBug(t *testing.T) { |
|
|
|
|
dir := t.TempDir() |
|
|
|
|
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
// Write few series records and samples such that the series references are not in order in the WAL
|
|
|
|
|
// for status_code="200".
|
|
|
|
|
var buf []byte |
|
|
|
|
for i := 1; i <= 1000; i++ { |
|
|
|
|
var ref uint64 |
|
|
|
|
if i <= 500 { |
|
|
|
|
ref = uint64(i * 100) |
|
|
|
|
} else { |
|
|
|
|
ref = uint64((i - 500) * 50) |
|
|
|
|
} |
|
|
|
|
seriesRec := record.RefSeries{ |
|
|
|
|
Ref: ref, |
|
|
|
|
Labels: labels.Labels{ |
|
|
|
|
{Name: "__name__", Value: "request_duration"}, |
|
|
|
|
{Name: "status_code", Value: "200"}, |
|
|
|
|
{Name: "foo", Value: fmt.Sprintf("baz%d", rand.Int())}, |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
// Add a sample so that the series is not garbage collected.
|
|
|
|
|
samplesRec := record.RefSample{Ref: ref, T: 1000, V: 1000} |
|
|
|
|
var enc record.Encoder |
|
|
|
|
|
|
|
|
|
rec := enc.Series([]record.RefSeries{seriesRec}, buf) |
|
|
|
|
buf = rec[:0] |
|
|
|
|
require.NoError(t, wlog.Log(rec)) |
|
|
|
|
rec = enc.Samples([]record.RefSample{samplesRec}, buf) |
|
|
|
|
buf = rec[:0] |
|
|
|
|
require.NoError(t, wlog.Log(rec)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Write a corrupt snapshot to fail the replay on startup.
|
|
|
|
|
snapshotName := chunkSnapshotDir(0, 100) |
|
|
|
|
cpdir := filepath.Join(dir, snapshotName) |
|
|
|
|
require.NoError(t, os.MkdirAll(cpdir, 0o777)) |
|
|
|
|
|
|
|
|
|
err = ioutil.WriteFile(filepath.Join(cpdir, "00000000"), []byte{1, 5, 3, 5, 6, 7, 4, 2, 2}, 0o777) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
opts := DefaultHeadOptions() |
|
|
|
|
opts.ChunkDirRoot = dir |
|
|
|
|
opts.EnableMemorySnapshotOnShutdown = true |
|
|
|
|
head, err := NewHead(nil, nil, wlog, opts, nil) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NoError(t, head.Init(math.MinInt64)) |
|
|
|
|
defer func() { |
|
|
|
|
require.NoError(t, head.Close()) |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
// Snapshot replay should error out.
|
|
|
|
|
require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal)) |
|
|
|
|
|
|
|
|
|
// Querying `request_duration{status_code!="200"}` should return no series since all of
|
|
|
|
|
// them have status_code="200".
|
|
|
|
|
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
series := query(t, q, |
|
|
|
|
labels.MustNewMatcher(labels.MatchEqual, "__name__", "request_duration"), |
|
|
|
|
labels.MustNewMatcher(labels.MatchNotEqual, "status_code", "200"), |
|
|
|
|
) |
|
|
|
|
require.Len(t, series, 0, "there should be no series found") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { |
|
|
|
|
dir := t.TempDir() |
|
|
|
|
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
// Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots.
|
|
|
|
|
snapshotName := chunkSnapshotDir(0, 100) + ".tmp" |
|
|
|
|
cpdir := filepath.Join(dir, snapshotName) |
|
|
|
|
require.NoError(t, os.MkdirAll(cpdir, 0o777)) |
|
|
|
|
|
|
|
|
|
opts := DefaultHeadOptions() |
|
|
|
|
opts.ChunkDirRoot = dir |
|
|
|
|
opts.EnableMemorySnapshotOnShutdown = true |
|
|
|
|
head, err := NewHead(nil, nil, wlog, opts, nil) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NoError(t, head.Init(math.MinInt64)) |
|
|
|
|
|
|
|
|
|
require.Equal(t, 0.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal)) |
|
|
|
|
|
|
|
|
|
// Add some samples for the snapshot.
|
|
|
|
|
app := head.Appender(context.Background()) |
|
|
|
|
_, err = app.Append(0, labels.Labels{{Name: "foo", Value: "bar"}}, 10, 10) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
|
|
|
|
|
// Should not return any error for a successful snapshot.
|
|
|
|
|
require.NoError(t, head.Close()) |
|
|
|
|
|
|
|
|
|
// Verify the snapshot.
|
|
|
|
|
name, idx, offset, err := LastChunkSnapshot(dir) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.True(t, name != "") |
|
|
|
|
require.Equal(t, 0, idx) |
|
|
|
|
require.Greater(t, offset, 0) |
|
|
|
|
} |
|
|
|
|