diff --git a/tsdb/db_test.go b/tsdb/db_test.go index c25b6ea7d..66dac0f0f 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -316,22 +316,6 @@ Outer: newSeries(map[string]string{"a": "b"}, expSamples), }) - lns, err := q.LabelNames() - testutil.Ok(t, err) - lvs, err := q.LabelValues("a") - testutil.Ok(t, err) - if len(expSamples) == 0 { - testutil.Equals(t, 0, len(lns)) - testutil.Equals(t, 0, len(lvs)) - testutil.Assert(t, res.Next() == false, "") - continue - } else { - testutil.Equals(t, 1, len(lns)) - testutil.Equals(t, 1, len(lvs)) - testutil.Equals(t, "a", lns[0]) - testutil.Equals(t, "b", lvs[0]) - } - for { eok, rok := expss.Next(), res.Next() testutil.Equals(t, eok, rok) diff --git a/tsdb/head.go b/tsdb/head.go index caebeb4cb..2bf250894 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -56,10 +56,6 @@ var ( // ErrInvalidSample is returned if an appended sample is not valid and can't // be ingested. ErrInvalidSample = errors.New("invalid sample") - - // emptyTombstoneReader is a no-op Tombstone Reader. - // This is used by head to satisfy the Tombstones() function call. - emptyTombstoneReader = tombstones.NewMemTombstones() ) // Head handles reads and writes of time series data within a time window. @@ -91,6 +87,8 @@ type Head struct { postings *index.MemPostings // postings lists for terms + tombstones *tombstones.MemTombstones + cardinalityMutex sync.Mutex cardinalityCache *index.PostingsStats // posting stats cache which will expire after 30sec lastPostingsStatsCall time.Duration // last posting stats call (PostingsCardinalityStats()) time for caching @@ -276,6 +274,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), + tombstones: tombstones.NewMemTombstones(), deleted: map[uint64]int{}, } h.metrics = newHeadMetrics(h, r) @@ -391,15 +390,9 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { } var ( - dec record.Decoder - allStones = tombstones.NewMemTombstones() - shards = make([][]record.RefSample, n) + dec record.Decoder + shards = make([][]record.RefSample, n) ) - defer func() { - if err := allStones.Close(); err != nil { - level.Warn(h.logger).Log("msg", "closing memTombstones during wal read", "err", err) - } - }() var ( decoded = make(chan interface{}, 10) @@ -532,7 +525,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { unknownRefs++ continue } - allStones.AddInterval(s.Ref, itv) + h.tombstones.AddInterval(s.Ref, itv) } } //lint:ignore SA6002 relax staticcheck verification. @@ -560,12 +553,6 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { return errors.Wrap(r.Err(), "read records") } - if err := allStones.Iter(func(ref uint64, dranges tombstones.Intervals) error { - return h.chunkRewrite(ref, dranges) - }); err != nil { - return errors.Wrap(r.Err(), "deleting samples from tombstones") - } - if unknownRefs > 0 { level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs) } @@ -772,7 +759,7 @@ func (h *rangeHead) Chunks() (ChunkReader, error) { } func (h *rangeHead) Tombstones() (tombstones.Reader, error) { - return emptyTombstoneReader, nil + return h.head.tombstones, nil } func (h *rangeHead) MinTime() int64 { @@ -1066,7 +1053,6 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { } var stones []tombstones.Stone - dirty := false for p.Next() { series := h.series.getByID(p.At()) @@ -1076,59 +1062,19 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { } // Delete only until the current values and not beyond. t0, t1 = clampInterval(mint, maxt, t0, t1) - if h.wal != nil { - stones = append(stones, tombstones.Stone{Ref: p.At(), Intervals: tombstones.Intervals{{Mint: t0, Maxt: t1}}}) - } - if err := h.chunkRewrite(p.At(), tombstones.Intervals{{Mint: t0, Maxt: t1}}); err != nil { - return errors.Wrap(err, "delete samples") - } - dirty = true + stones = append(stones, tombstones.Stone{Ref: p.At(), Intervals: tombstones.Intervals{{Mint: t0, Maxt: t1}}}) } if p.Err() != nil { return p.Err() } - var enc record.Encoder if h.wal != nil { - // Although we don't store the stones in the head - // we need to write them to the WAL to mark these as deleted - // after a restart while loading the WAL. + var enc record.Encoder if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { return err } } - if dirty { - h.gc() - } - - return nil -} - -// chunkRewrite re-writes the chunks which overlaps with deleted ranges -// and removes the samples in the deleted ranges. -// Chunks is deleted if no samples are left at the end. -func (h *Head) chunkRewrite(ref uint64, dranges tombstones.Intervals) (err error) { - if len(dranges) == 0 { - return nil - } - - ms := h.series.getByID(ref) - ms.Lock() - defer ms.Unlock() - if len(ms.chunks) == 0 { - return nil - } - - metas := ms.chunksMetas() - mint, maxt := metas[0].MinTime, metas[len(metas)-1].MaxTime - it := newChunkSeriesIterator(metas, dranges, mint, maxt) - - ms.reset() - for it.Next() { - t, v := it.At() - ok, _ := ms.append(t, v) - if !ok { - level.Warn(h.logger).Log("msg", "failed to add sample during delete") - } + for _, s := range stones { + h.tombstones.AddInterval(s.Ref, s.Intervals[0]) } return nil @@ -1199,7 +1145,7 @@ func (h *Head) gc() { // Tombstones returns a new reader over the head's tombstones func (h *Head) Tombstones() (tombstones.Reader, error) { - return emptyTombstoneReader, nil + return h.tombstones, nil } // Index returns an IndexReader against the block. @@ -1748,26 +1694,6 @@ func (s *memSeries) cut(mint int64) *memChunk { return c } -func (s *memSeries) chunksMetas() []chunks.Meta { - metas := make([]chunks.Meta, 0, len(s.chunks)) - for _, chk := range s.chunks { - metas = append(metas, chunks.Meta{Chunk: chk.chunk, MinTime: chk.minTime, MaxTime: chk.maxTime}) - } - return metas -} - -// reset re-initialises all the variable in the memSeries except 'lset', 'ref', -// and 'chunkRange', like how it would appear after 'newMemSeries(...)'. -func (s *memSeries) reset() { - s.chunks = nil - s.headChunk = nil - s.firstChunkID = 0 - s.nextAt = math.MinInt64 - s.sampleBuf = [4]sample{} - s.pendingCommit = false - s.app = nil -} - // appendable checks whether the given sample is valid for appending to the series. func (s *memSeries) appendable(t int64, v float64) error { c := s.head() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index b6a7c1cba..3ad49c646 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -454,8 +454,9 @@ func TestHeadDeleteSimple(t *testing.T) { lblDefault := labels.Label{Name: "a", Value: "b"} cases := []struct { - dranges tombstones.Intervals - smplsExp []sample + dranges tombstones.Intervals + addSamples []sample // Samples to add after delete. + smplsExp []sample }{ { dranges: tombstones.Intervals{{Mint: 0, Maxt: 3}}, @@ -477,6 +478,18 @@ func TestHeadDeleteSimple(t *testing.T) { dranges: tombstones.Intervals{{Mint: 0, Maxt: 9}}, smplsExp: buildSmpls([]int64{}), }, + { + dranges: tombstones.Intervals{{Mint: 1, Maxt: 3}}, + addSamples: buildSmpls([]int64{11, 13, 15}), + smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9, 11, 13, 15}), + }, + { + // After delete, the appended samples in the deleted range should be visible + // as the tombstones are clamped to head min/max time. + dranges: tombstones.Intervals{{Mint: 7, Maxt: 20}}, + addSamples: buildSmpls([]int64{11, 13, 15}), + smplsExp: buildSmpls([]int64{0, 1, 2, 3, 4, 5, 6, 11, 13, 15}), + }, } for _, compress := range []bool{false, true} { @@ -510,6 +523,15 @@ func TestHeadDeleteSimple(t *testing.T) { testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))) } + // Add more samples. + app = head.Appender() + for _, smpl := range c.addSamples { + _, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) + testutil.Ok(t, err) + + } + testutil.Ok(t, app.Commit()) + // Compare the samples for both heads - before and after the reload. reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload. testutil.Ok(t, err) @@ -518,37 +540,6 @@ func TestHeadDeleteSimple(t *testing.T) { testutil.Ok(t, err) defer reloadedHead.Close() testutil.Ok(t, reloadedHead.Init(0)) - for _, h := range []*Head{head, reloadedHead} { - indexr, err := h.Index() - testutil.Ok(t, err) - // Use an emptyTombstoneReader explicitly to get all the samples. - css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) - testutil.Ok(t, err) - - // Getting the actual samples. - actSamples := make([]sample, 0) - for css.Next() { - lblsAct, chkMetas, intv := css.At() - testutil.Equals(t, labels.Labels{lblDefault}, lblsAct) - testutil.Equals(t, 0, len(intv)) - - chunkr, err := h.Chunks() - testutil.Ok(t, err) - var ii chunkenc.Iterator - for _, meta := range chkMetas { - chk, err := chunkr.Chunk(meta.Ref) - testutil.Ok(t, err) - ii = chk.Iterator(ii) - for ii.Next() { - t, v := ii.At() - actSamples = append(actSamples, sample{t: t, v: v}) - } - } - } - - testutil.Ok(t, css.Err()) - testutil.Equals(t, c.smplsExp, actSamples) - } // Compare the query results for both heads - before and after the reload. expSeriesSet := newMockSeriesSet([]Series{ @@ -567,24 +558,6 @@ func TestHeadDeleteSimple(t *testing.T) { actSeriesSet, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) testutil.Ok(t, err) - lns, err := q.LabelNames() - testutil.Ok(t, err) - lvs, err := q.LabelValues(lblDefault.Name) - testutil.Ok(t, err) - // When all samples are deleted we expect that no labels should exist either. - if len(c.smplsExp) == 0 { - testutil.Equals(t, 0, len(lns)) - testutil.Equals(t, 0, len(lvs)) - testutil.Assert(t, actSeriesSet.Next() == false, "") - testutil.Ok(t, h.Close()) - continue - } else { - testutil.Equals(t, 1, len(lns)) - testutil.Equals(t, 1, len(lvs)) - testutil.Equals(t, lblDefault.Name, lns[0]) - testutil.Equals(t, lblDefault.Value, lvs[0]) - } - for { eok, rok := expSeriesSet.Next(), actSeriesSet.Next() testutil.Equals(t, eok, rok) @@ -625,12 +598,15 @@ func TestDeleteUntilCurMax(t *testing.T) { testutil.Ok(t, app.Commit()) testutil.Ok(t, hb.Delete(0, 10000, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) - // Test the series have been deleted. + // Test the series returns no samples. The series is cleared only after compaction. q, err := NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) res, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) - testutil.Assert(t, !res.Next(), "series didn't get deleted") + testutil.Assert(t, res.Next(), "series is not present") + s := res.At() + it := s.Iterator() + testutil.Assert(t, !it.Next(), "expected no samples") // Add again and test for presence. app = hb.Appender() @@ -643,7 +619,7 @@ func TestDeleteUntilCurMax(t *testing.T) { testutil.Ok(t, err) testutil.Assert(t, res.Next(), "series don't exist") exps := res.At() - it := exps.Iterator() + it = exps.Iterator() resSamples, err := expandSeriesIterator(it) testutil.Ok(t, err) testutil.Equals(t, []tsdbutil.Sample{sample{11, 1}}, resSamples) diff --git a/tsdb/tombstones/tombstones.go b/tsdb/tombstones/tombstones.go index 9cdf05a1e..20a1007c7 100644 --- a/tsdb/tombstones/tombstones.go +++ b/tsdb/tombstones/tombstones.go @@ -199,18 +199,18 @@ func ReadTombstones(dir string) (Reader, int64, error) { return stonesMap, int64(len(b)), nil } -type memTombstones struct { +type MemTombstones struct { intvlGroups map[uint64]Intervals mtx sync.RWMutex } // NewMemTombstones creates new in memory Tombstone Reader // that allows adding new intervals. -func NewMemTombstones() *memTombstones { - return &memTombstones{intvlGroups: make(map[uint64]Intervals)} +func NewMemTombstones() *MemTombstones { + return &MemTombstones{intvlGroups: make(map[uint64]Intervals)} } -func NewTestMemTombstones(intervals []Intervals) *memTombstones { +func NewTestMemTombstones(intervals []Intervals) *MemTombstones { ret := NewMemTombstones() for i, intervalsGroup := range intervals { for _, interval := range intervalsGroup { @@ -220,13 +220,13 @@ func NewTestMemTombstones(intervals []Intervals) *memTombstones { return ret } -func (t *memTombstones) Get(ref uint64) (Intervals, error) { +func (t *MemTombstones) Get(ref uint64) (Intervals, error) { t.mtx.RLock() defer t.mtx.RUnlock() return t.intvlGroups[ref], nil } -func (t *memTombstones) Iter(f func(uint64, Intervals) error) error { +func (t *MemTombstones) Iter(f func(uint64, Intervals) error) error { t.mtx.RLock() defer t.mtx.RUnlock() for ref, ivs := range t.intvlGroups { @@ -237,7 +237,7 @@ func (t *memTombstones) Iter(f func(uint64, Intervals) error) error { return nil } -func (t *memTombstones) Total() uint64 { +func (t *MemTombstones) Total() uint64 { t.mtx.RLock() defer t.mtx.RUnlock() @@ -249,7 +249,7 @@ func (t *memTombstones) Total() uint64 { } // AddInterval to an existing memTombstones. -func (t *memTombstones) AddInterval(ref uint64, itvs ...Interval) { +func (t *MemTombstones) AddInterval(ref uint64, itvs ...Interval) { t.mtx.Lock() defer t.mtx.Unlock() for _, itv := range itvs { @@ -257,7 +257,7 @@ func (t *memTombstones) AddInterval(ref uint64, itvs ...Interval) { } } -func (*memTombstones) Close() error { +func (*MemTombstones) Close() error { return nil }