diff --git a/compact.go b/compact.go index 955ba3caf..28aefdd5b 100644 --- a/compact.go +++ b/compact.go @@ -525,22 +525,24 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, if len(dranges) > 0 { // Re-encode the chunk to not have deleted values. - for _, chk := range chks { - if intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) { - newChunk := chunks.NewXORChunk() - app, err := newChunk.Appender() - if err != nil { - return err - } - - it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges} - for it.Next() { - ts, v := it.At() - app.Append(ts, v) - } - - chk.Chunk = newChunk + for i, chk := range chks { + if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) { + continue } + + newChunk := chunks.NewXORChunk() + app, err := newChunk.Appender() + if err != nil { + return err + } + + it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges} + for it.Next() { + ts, v := it.At() + app.Append(ts, v) + } + + chks[i].Chunk = newChunk } } if err := chunkw.WriteChunks(chks...); err != nil { diff --git a/db.go b/db.go index f1972f00b..1a500b244 100644 --- a/db.go +++ b/db.go @@ -612,7 +612,7 @@ func (db *DB) Snapshot(dir string) error { level.Info(db.logger).Log("msg", "snapshotting block", "block", b) if err := b.Snapshot(dir); err != nil { - return errors.Wrap(err, "error snapshotting headblock") + return errors.Wrapf(err, "error snapshotting block: %s", b.Dir()) } } return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) diff --git a/db_test.go b/db_test.go index 9f91705e7..fa8132290 100644 --- a/db_test.go +++ b/db_test.go @@ -366,6 +366,90 @@ func TestDB_Snapshot(t *testing.T) { require.Equal(t, sum, 1000.0) } +func TestDB_SnapshotWithDelete(t *testing.T) { + numSamples := int64(10) + + db, close := openTestDB(t, nil) + defer close() + + app := db.Appender() + + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + } + + require.NoError(t, app.Commit()) + cases := []struct { + intervals Intervals + remaint []int64 + }{ + { + intervals: Intervals{{1, 3}, {4, 7}}, + remaint: []int64{0, 8, 9}, + }, + } + +Outer: + for _, c := range cases { + // TODO(gouthamve): Reset the tombstones somehow. + // Delete the ranges. + for _, r := range c.intervals { + require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b"))) + } + + // create snapshot + snap, err := ioutil.TempDir("", "snap") + require.NoError(t, err) + require.NoError(t, db.Snapshot(snap)) + require.NoError(t, db.Close()) + + // reopen DB from snapshot + db, err = Open(snap, nil, nil, nil) + require.NoError(t, err) + + // Compare the result. + q, err := db.Querier(0, numSamples) + require.NoError(t, err) + + res := q.Select(labels.NewEqualMatcher("a", "b")) + + expSamples := make([]sample, 0, len(c.remaint)) + for _, ts := range c.remaint { + expSamples = append(expSamples, sample{ts, smpls[ts]}) + } + + expss := newListSeriesSet([]Series{ + newSeries(map[string]string{"a": "b"}, expSamples), + }) + + if len(expSamples) == 0 { + require.False(t, res.Next()) + continue + } + + for { + eok, rok := expss.Next(), res.Next() + require.Equal(t, eok, rok, "next") + + if !eok { + continue Outer + } + sexp := expss.At() + sres := res.At() + + require.Equal(t, sexp.Labels(), sres.Labels(), "labels") + + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + } +} + func TestDB_e2e(t *testing.T) { const ( numDatapoints = 1000