diff --git a/tsdb/db.go b/tsdb/db.go index b5a0c9959..9d17b406b 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1525,8 +1525,32 @@ func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, err blocks = append(blocks, b) } } + var headQuerier storage.Querier if maxt >= db.head.MinTime() { - blocks = append(blocks, NewRangeHead(db.head, mint, maxt)) + rh := NewRangeHead(db.head, mint, maxt) + var err error + headQuerier, err = NewBlockQuerier(rh, mint, maxt) + if err != nil { + return nil, errors.Wrapf(err, "open querier for head %s", rh) + } + + // Getting the querier above registers itself in the queue that the truncation waits on. + // So if the querier is currently not colliding with any truncation, we can continue to use it and still + // won't run into a race later since any truncation that comes after will wait on this querier if it overlaps. + shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt) + if shouldClose { + if err := headQuerier.Close(); err != nil { + return nil, errors.Wrapf(err, "closing head querier %s", rh) + } + headQuerier = nil + } + if getNew { + rh := NewRangeHead(db.head, newMint, maxt) + headQuerier, err = NewBlockQuerier(rh, newMint, maxt) + if err != nil { + return nil, errors.Wrapf(err, "open querier for head while getting new querier %s", rh) + } + } } blockQueriers := make([]storage.Querier, 0, len(blocks)) @@ -1543,6 +1567,9 @@ func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, err } return nil, errors.Wrapf(err, "open querier for block %s", b) } + if headQuerier != nil { + blockQueriers = append(blockQueriers, headQuerier) + } return storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), nil } @@ -1558,8 +1585,32 @@ func (db *DB) ChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQu blocks = append(blocks, b) } } + var headQuerier storage.ChunkQuerier if maxt >= db.head.MinTime() { - blocks = append(blocks, NewRangeHead(db.head, mint, maxt)) + rh := NewRangeHead(db.head, mint, maxt) + var err error + headQuerier, err = NewBlockChunkQuerier(rh, mint, maxt) + if err != nil { + return nil, errors.Wrapf(err, "open querier for head %s", rh) + } + + // Getting the querier above registers itself in the queue that the truncation waits on. + // So if the querier is currently not colliding with any truncation, we can continue to use it and still + // won't run into a race later since any truncation that comes after will wait on this querier if it overlaps. + shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(mint, maxt) + if shouldClose { + if err := headQuerier.Close(); err != nil { + return nil, errors.Wrapf(err, "closing head querier %s", rh) + } + headQuerier = nil + } + if getNew { + rh := NewRangeHead(db.head, newMint, maxt) + headQuerier, err = NewBlockChunkQuerier(rh, newMint, maxt) + if err != nil { + return nil, errors.Wrapf(err, "open querier for head while getting new querier %s", rh) + } + } } blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks)) @@ -1576,6 +1627,9 @@ func (db *DB) ChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQu } return nil, errors.Wrapf(err, "open querier for block %s", b) } + if headQuerier != nil { + blockQueriers = append(blockQueriers, headQuerier) + } return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 677e65da4..876a13920 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -3444,3 +3444,18 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun require.NoError(t, err) } } + +func newTestDB(t *testing.T) *DB { + dir, err := ioutil.TempDir("", "test") + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(dir)) + }) + + db, err := Open(dir, nil, nil, DefaultOptions(), nil) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + return db +} diff --git a/tsdb/head.go b/tsdb/head.go index 0b3574233..2f1896dee 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -64,12 +64,13 @@ type ExemplarStorage interface { // Head handles reads and writes of time series data within a time window. type Head struct { - chunkRange atomic.Int64 - numSeries atomic.Uint64 - minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. - minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. - lastWALTruncationTime atomic.Int64 - lastSeriesID atomic.Uint64 + chunkRange atomic.Int64 + numSeries atomic.Uint64 + minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. + minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. + lastWALTruncationTime atomic.Int64 + lastMemoryTruncationTime atomic.Int64 + lastSeriesID atomic.Uint64 metrics *headMetrics opts *HeadOptions @@ -110,6 +111,8 @@ type Head struct { stats *HeadStats reg prometheus.Registerer + + memTruncationInProcess atomic.Bool } // HeadOptions are parameters for the Head block. @@ -414,6 +417,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti h.minTime.Store(math.MaxInt64) h.maxTime.Store(math.MinInt64) h.lastWALTruncationTime.Store(math.MinInt64) + h.lastMemoryTruncationTime.Store(math.MinInt64) h.metrics = newHeadMetrics(h, r) if opts.ChunkPool == nil { @@ -974,11 +978,24 @@ func (h *Head) truncateMemory(mint int64) (err error) { h.metrics.headTruncateFail.Inc() } }() + initialize := h.MinTime() == math.MaxInt64 if h.MinTime() >= mint && !initialize { return nil } + + // The order of these two Store() should not be changed, + // i.e. truncation time is set before in-process boolean. + h.lastMemoryTruncationTime.Store(mint) + h.memTruncationInProcess.Store(true) + defer h.memTruncationInProcess.Store(false) + + // We wait for pending queries to end that overlap with this truncation. + if !initialize { + h.WaitForPendingReadersInTimeRange(h.MinTime(), mint) + } + h.minTime.Store(mint) h.minValidTime.Store(mint) @@ -1020,6 +1037,75 @@ func (h *Head) truncateMemory(mint int64) (err error) { return nil } +// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying. +// The query timeout limits the max wait time of this function implicitly. +// The mint is inclusive and maxt is the truncation time hence exclusive. +func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64) { + maxt-- // Making it inclusive before checking overlaps. + overlaps := func() bool { + o := false + h.iso.TraverseOpenReads(func(s *isolationState) bool { + if s.mint <= maxt && mint <= s.maxt { + // Overlaps with the truncation range. + o = true + return false + } + return true + }) + return o + } + for overlaps() { + time.Sleep(500 * time.Millisecond) + } +} + +// IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier +// has to be created. In the latter case, the method also returns the new mint to be used for creating the +// new range head and the new querier. This methods helps preventing races with the truncation of in-memory data. +// +// NOTE: The querier should already be taken before calling this. +func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) (shouldClose bool, getNew bool, newMint int64) { + if !h.memTruncationInProcess.Load() { + return false, false, 0 + } + // Head truncation is in process. It also means that the block that was + // created for this truncation range is also available. + // Check if we took a querier that overlaps with this truncation. + memTruncTime := h.lastMemoryTruncationTime.Load() + if querierMaxt < memTruncTime { + // Head compaction has happened and this time range is being truncated. + // This query doesn't overlap with the Head any longer. + // We should close this querier to avoid races and the data would be + // available with the blocks below. + // Cases: + // 1. |------truncation------| + // |---query---| + // 2. |------truncation------| + // |---query---| + return true, false, 0 + } + if querierMint < memTruncTime { + // The truncation time is not same as head mint that we saw above but the + // query still overlaps with the Head. + // The truncation started after we got the querier. So it is not safe + // to use this querier and/or might block truncation. We should get + // a new querier for the new Head range while remaining will be available + // in the blocks below. + // Case: + // |------truncation------| + // |----query----| + // Turns into + // |------truncation------| + // |---qu---| + return true, true, memTruncTime + } + + // Other case is this, which is a no-op + // |------truncation------| + // |---query---| + return false, false, 0 +} + // truncateWAL removes old data before mint from the WAL. func (h *Head) truncateWAL(mint int64) error { if h.wal == nil || mint <= h.lastWALTruncationTime.Load() { @@ -1147,7 +1233,7 @@ func (h *RangeHead) Index() (IndexReader, error) { } func (h *RangeHead) Chunks() (ChunkReader, error) { - return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State()) + return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State(h.mint, h.maxt)) } func (h *RangeHead) Tombstones() (tombstones.Reader, error) { @@ -1721,7 +1807,7 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader { // Chunks returns a ChunkReader against the block. func (h *Head) Chunks() (ChunkReader, error) { - return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State()) + return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State(math.MinInt64, math.MaxInt64)) } func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkReader, error) { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index df71b876f..d9a494e7f 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -16,6 +16,7 @@ package tsdb import ( "context" "fmt" + "io" "io/ioutil" "math" "math/rand" @@ -25,10 +26,12 @@ import ( "strconv" "sync" "testing" + "time" "github.com/pkg/errors" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/exemplar" @@ -1230,11 +1233,11 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { q, err := NewBlockQuerier(h, 1500, 2500) require.NoError(t, err) - defer q.Close() ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) require.Equal(t, false, ss.Next()) require.Equal(t, 0, len(ss.Warnings())) + require.NoError(t, q.Close()) // Truncate again, this time the series should be deleted require.NoError(t, h.Truncate(2050)) @@ -1490,7 +1493,7 @@ func TestMemSeriesIsolation(t *testing.T) { require.NoError(t, err) - iso := h.iso.State() + iso := h.iso.State(math.MinInt64, math.MaxInt64) iso.maxAppendID = maxAppendID chunks, err := h.chunksRange(math.MinInt64, math.MaxInt64, iso) @@ -1705,7 +1708,7 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) { require.NoError(t, app2.Commit()) require.Equal(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should stay two because app1 is not committed yet.") - is := hb.iso.State() + is := hb.iso.State(math.MinInt64, math.MaxInt64) require.Equal(t, uint64(2), hb.iso.lowWatermark(), "After simulated read (iso state retrieved), low watermark should stay at 2.") require.NoError(t, app1.Commit()) @@ -2179,3 +2182,218 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { ok = it.Seek(7) require.False(t, ok) } + +// Tests https://github.com/prometheus/prometheus/issues/8221. +func TestChunkNotFoundHeadGCRace(t *testing.T) { + db := newTestDB(t) + db.DisableCompactions() + + var ( + app = db.Appender(context.Background()) + ref = uint64(0) + mint, maxt = int64(0), int64(0) + err error + ) + + // Appends samples to span over 1.5 block ranges. + // 7 chunks with 15s scrape interval. + for i := int64(0); i <= 120*7; i++ { + ts := i * DefaultBlockDuration / (4 * 120) + ref, err = app.Append(ref, labels.FromStrings("a", "b"), ts, float64(i)) + require.NoError(t, err) + maxt = ts + } + require.NoError(t, app.Commit()) + + // Get a querier before compaction (or when compaction is about to begin). + q, err := db.Querier(context.Background(), mint, maxt) + require.NoError(t, err) + + // Query the compacted range and get the first series before compaction. + ss := q.Select(true, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + require.True(t, ss.Next()) + s := ss.At() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // Compacting head while the querier spans the compaction time. + require.NoError(t, db.Compact()) + require.Greater(t, len(db.Blocks()), 0) + }() + + // Give enough time for compaction to finish. + // We expect it to be blocked until querier is closed. + <-time.After(3 * time.Second) + + // Now consume after compaction when it's gone. + it := s.Iterator() + for it.Next() { + _, _ = it.At() + } + // It should error here without any fix for the mentioned issue. + require.NoError(t, it.Err()) + for ss.Next() { + s = ss.At() + it := s.Iterator() + for it.Next() { + _, _ = it.At() + } + require.NoError(t, it.Err()) + } + require.NoError(t, ss.Err()) + + require.NoError(t, q.Close()) + wg.Wait() +} + +// Tests https://github.com/prometheus/prometheus/issues/9079. +func TestDataMissingOnQueryDuringCompaction(t *testing.T) { + db := newTestDB(t) + db.DisableCompactions() + + var ( + app = db.Appender(context.Background()) + ref = uint64(0) + mint, maxt = int64(0), int64(0) + err error + ) + + // Appends samples to span over 1.5 block ranges. + expSamples := make([]tsdbutil.Sample, 0) + // 7 chunks with 15s scrape interval. + for i := int64(0); i <= 120*7; i++ { + ts := i * DefaultBlockDuration / (4 * 120) + ref, err = app.Append(ref, labels.FromStrings("a", "b"), ts, float64(i)) + require.NoError(t, err) + maxt = ts + expSamples = append(expSamples, sample{ts, float64(i)}) + } + require.NoError(t, app.Commit()) + + // Get a querier before compaction (or when compaction is about to begin). + q, err := db.Querier(context.Background(), mint, maxt) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // Compacting head while the querier spans the compaction time. + require.NoError(t, db.Compact()) + require.Greater(t, len(db.Blocks()), 0) + }() + + // Give enough time for compaction to finish. + // We expect it to be blocked until querier is closed. + <-time.After(3 * time.Second) + + // Querying the querier that was got before compaction. + series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + require.Equal(t, map[string][]tsdbutil.Sample{`{a="b"}`: expSamples}, series) + + wg.Wait() +} + +func TestIsQuerierCollidingWithTruncation(t *testing.T) { + db := newTestDB(t) + db.DisableCompactions() + + var ( + app = db.Appender(context.Background()) + ref = uint64(0) + err error + ) + + for i := int64(0); i <= 3000; i++ { + ref, err = app.Append(ref, labels.FromStrings("a", "b"), i, float64(i)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + // This mocks truncation. + db.head.memTruncationInProcess.Store(true) + db.head.lastMemoryTruncationTime.Store(2000) + + // Test that IsQuerierValid suggests correct querier ranges. + cases := []struct { + mint, maxt int64 // For the querier. + expShouldClose, expGetNew bool + expNewMint int64 + }{ + {-200, -100, true, false, 0}, + {-200, 300, true, false, 0}, + {100, 1900, true, false, 0}, + {1900, 2200, true, true, 2000}, + {2000, 2500, false, false, 0}, + } + + for _, c := range cases { + t.Run(fmt.Sprintf("mint=%d,maxt=%d", c.mint, c.maxt), func(t *testing.T) { + shouldClose, getNew, newMint := db.head.IsQuerierCollidingWithTruncation(c.mint, c.maxt) + require.Equal(t, c.expShouldClose, shouldClose) + require.Equal(t, c.expGetNew, getNew) + if getNew { + require.Equal(t, c.expNewMint, newMint) + } + }) + } +} + +func TestWaitForPendingReadersInTimeRange(t *testing.T) { + db := newTestDB(t) + db.DisableCompactions() + + sampleTs := func(i int64) int64 { return i * DefaultBlockDuration / (4 * 120) } + + var ( + app = db.Appender(context.Background()) + ref = uint64(0) + err error + ) + + for i := int64(0); i <= 3000; i++ { + ts := sampleTs(i) + ref, err = app.Append(ref, labels.FromStrings("a", "b"), ts, float64(i)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + truncMint, truncMaxt := int64(1000), int64(2000) + cases := []struct { + mint, maxt int64 + shouldWait bool + }{ + {0, 500, false}, // Before truncation range. + {500, 1500, true}, // Overlaps with truncation at the start. + {1200, 1700, true}, // Within truncation range. + {1800, 2500, true}, // Overlaps with truncation at the end. + {2000, 2500, false}, // After truncation range. + {2100, 2500, false}, // After truncation range. + } + for _, c := range cases { + t.Run(fmt.Sprintf("mint=%d,maxt=%d,shouldWait=%t", c.mint, c.maxt, c.shouldWait), func(t *testing.T) { + checkWaiting := func(cl io.Closer) { + var waitOver atomic.Bool + go func() { + db.head.WaitForPendingReadersInTimeRange(truncMint, truncMaxt) + waitOver.Store(true) + }() + <-time.After(550 * time.Millisecond) + require.Equal(t, !c.shouldWait, waitOver.Load()) + require.NoError(t, cl.Close()) + <-time.After(550 * time.Millisecond) + require.True(t, waitOver.Load()) + } + + q, err := db.Querier(context.Background(), c.mint, c.maxt) + require.NoError(t, err) + checkWaiting(q) + + cq, err := db.ChunkQuerier(context.Background(), c.mint, c.maxt) + require.NoError(t, err) + checkWaiting(cq) + }) + } +} diff --git a/tsdb/isolation.go b/tsdb/isolation.go index 51302b23d..523f93896 100644 --- a/tsdb/isolation.go +++ b/tsdb/isolation.go @@ -24,6 +24,7 @@ type isolationState struct { incompleteAppends map[uint64]struct{} lowWatermark uint64 // Lowest of incompleteAppends/maxAppendID. isolation *isolation + mint, maxt int64 // Time ranges of the read. // Doubly linked list of active reads. next *isolationState @@ -102,7 +103,7 @@ func (i *isolation) lowWatermarkLocked() uint64 { // State returns an object used to control isolation // between a query and appends. Must be closed when complete. -func (i *isolation) State() *isolationState { +func (i *isolation) State(mint, maxt int64) *isolationState { i.appendMtx.RLock() // Take append mutex before read mutex. defer i.appendMtx.RUnlock() isoState := &isolationState{ @@ -110,6 +111,8 @@ func (i *isolation) State() *isolationState { lowWatermark: i.appendsOpenList.next.appendID, // Lowest appendID from appenders, or lastAppendId. incompleteAppends: make(map[uint64]struct{}, len(i.appendsOpen)), isolation: i, + mint: mint, + maxt: maxt, } for k := range i.appendsOpen { isoState.incompleteAppends[k] = struct{}{} @@ -124,6 +127,21 @@ func (i *isolation) State() *isolationState { return isoState } +// TraverseOpenReads iterates through the open reads and runs the given +// function on those states. The given function MUST NOT mutate the isolationState. +// The iteration is stopped when the function returns false or once all reads have been iterated. +func (i *isolation) TraverseOpenReads(f func(s *isolationState) bool) { + i.readMtx.RLock() + defer i.readMtx.RUnlock() + s := i.readsOpen.next + for s != i.readsOpen { + if !f(s) { + return + } + s = s.next + } +} + // newAppendID increments the transaction counter and returns a new transaction // ID. The first ID returned is 1. // Also returns the low watermark, to keep lock/unlock operations down diff --git a/tsdb/isolation_test.go b/tsdb/isolation_test.go index 79ae21435..e41e3f5a2 100644 --- a/tsdb/isolation_test.go +++ b/tsdb/isolation_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "math" "strconv" "sync" "testing" @@ -85,7 +86,7 @@ func BenchmarkIsolationWithState(b *testing.B) { <-start for i := 0; i < b.N; i++ { - s := iso.State() + s := iso.State(math.MinInt64, math.MaxInt64) s.Close() } }()