TSDB: Don't rely on integer overflow in head compaction check (#13755)

* TSDB: Don't compact the head block when empty

Don't compact the Head block if there have not yet been any samples
appended.

Previously, the logic for determining if the head should be compacted
relied on the default values for min and max time and integer overflow
when they were checked in `Head.compactable()`. The check in
`Head.compactable()` effectively did `math.MinInt64 - math.MaxInt64`
which overflowed and wrapped to `1`. Since `1` is less than `1.5`
times the chunk range, compaction did not happen. This was the correct
behavior but relying on overflow wrapping is surprising.

This change add a method for checking if the min and max time for the
head is unset and uses it to short-circuit compaction in that case.
It also replaces several explicit checks for the default value to
determine if the head has not yet had any samples added.

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
pull/13844/head
Nick Pillitteri 2024-03-26 07:17:38 -04:00 committed by GitHub
parent ceca6c4716
commit 481f14e1c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 35 additions and 8 deletions

View File

@ -1994,6 +1994,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
// Should be set to init values if no WAL or blocks exist so far. // Should be set to init values if no WAL or blocks exist so far.
require.Equal(t, int64(math.MaxInt64), db.head.MinTime()) require.Equal(t, int64(math.MaxInt64), db.head.MinTime())
require.Equal(t, int64(math.MinInt64), db.head.MaxTime()) require.Equal(t, int64(math.MinInt64), db.head.MaxTime())
require.False(t, db.head.initialized())
// First added sample initializes the writable range. // First added sample initializes the writable range.
ctx := context.Background() ctx := context.Background()
@ -2003,6 +2004,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
require.Equal(t, int64(1000), db.head.MinTime()) require.Equal(t, int64(1000), db.head.MinTime())
require.Equal(t, int64(1000), db.head.MaxTime()) require.Equal(t, int64(1000), db.head.MaxTime())
require.True(t, db.head.initialized())
}) })
t.Run("wal-only", func(t *testing.T) { t.Run("wal-only", func(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
@ -2031,6 +2033,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
require.Equal(t, int64(5000), db.head.MinTime()) require.Equal(t, int64(5000), db.head.MinTime())
require.Equal(t, int64(15000), db.head.MaxTime()) require.Equal(t, int64(15000), db.head.MaxTime())
require.True(t, db.head.initialized())
}) })
t.Run("existing-block", func(t *testing.T) { t.Run("existing-block", func(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
@ -2043,6 +2046,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
require.Equal(t, int64(2000), db.head.MinTime()) require.Equal(t, int64(2000), db.head.MinTime())
require.Equal(t, int64(2000), db.head.MaxTime()) require.Equal(t, int64(2000), db.head.MaxTime())
require.True(t, db.head.initialized())
}) })
t.Run("existing-block-and-wal", func(t *testing.T) { t.Run("existing-block-and-wal", func(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
@ -2075,6 +2079,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
require.Equal(t, int64(6000), db.head.MinTime()) require.Equal(t, int64(6000), db.head.MinTime())
require.Equal(t, int64(15000), db.head.MaxTime()) require.Equal(t, int64(15000), db.head.MaxTime())
require.True(t, db.head.initialized())
// Check that old series has been GCed. // Check that old series has been GCed.
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.series)) require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.series))
}) })

View File

@ -1081,11 +1081,11 @@ func (h *Head) SetMinValidTime(minValidTime int64) {
// Truncate removes old data before mint from the head and WAL. // Truncate removes old data before mint from the head and WAL.
func (h *Head) Truncate(mint int64) (err error) { func (h *Head) Truncate(mint int64) (err error) {
initialize := h.MinTime() == math.MaxInt64 initialized := h.initialized()
if err := h.truncateMemory(mint); err != nil { if err := h.truncateMemory(mint); err != nil {
return err return err
} }
if initialize { if !initialized {
return nil return nil
} }
return h.truncateWAL(mint) return h.truncateWAL(mint)
@ -1107,9 +1107,9 @@ func (h *Head) truncateMemory(mint int64) (err error) {
} }
}() }()
initialize := h.MinTime() == math.MaxInt64 initialized := h.initialized()
if h.MinTime() >= mint && !initialize { if h.MinTime() >= mint && initialized {
return nil return nil
} }
@ -1120,7 +1120,7 @@ func (h *Head) truncateMemory(mint int64) (err error) {
defer h.memTruncationInProcess.Store(false) defer h.memTruncationInProcess.Store(false)
// We wait for pending queries to end that overlap with this truncation. // We wait for pending queries to end that overlap with this truncation.
if !initialize { if initialized {
h.WaitForPendingReadersInTimeRange(h.MinTime(), mint) h.WaitForPendingReadersInTimeRange(h.MinTime(), mint)
} }
@ -1134,7 +1134,7 @@ func (h *Head) truncateMemory(mint int64) (err error) {
// This was an initial call to Truncate after loading blocks on startup. // This was an initial call to Truncate after loading blocks on startup.
// We haven't read back the WAL yet, so do not attempt to truncate it. // We haven't read back the WAL yet, so do not attempt to truncate it.
if initialize { if !initialized {
return nil return nil
} }
@ -1622,10 +1622,19 @@ func (h *Head) MaxOOOTime() int64 {
return h.maxOOOTime.Load() return h.maxOOOTime.Load()
} }
// initialized returns true if the head has a MinTime set, false otherwise.
func (h *Head) initialized() bool {
return h.MinTime() != math.MaxInt64
}
// compactable returns whether the head has a compactable range. // compactable returns whether the head has a compactable range.
// The head has a compactable range when the head time range is 1.5 times the chunk range. // The head has a compactable range when the head time range is 1.5 times the chunk range.
// The 0.5 acts as a buffer of the appendable window. // The 0.5 acts as a buffer of the appendable window.
func (h *Head) compactable() bool { func (h *Head) compactable() bool {
if !h.initialized() {
return false
}
return h.MaxTime()-h.MinTime() > h.chunkRange.Load()/2*3 return h.MaxTime()-h.MinTime() > h.chunkRange.Load()/2*3
} }

View File

@ -138,7 +138,7 @@ func (h *Head) Appender(_ context.Context) storage.Appender {
// The head cache might not have a starting point yet. The init appender // The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base. // picks up the first appended timestamp as the base.
if h.MinTime() == math.MaxInt64 { if !h.initialized() {
return &initAppender{ return &initAppender{
head: h, head: h,
} }
@ -191,7 +191,7 @@ func (h *Head) appendableMinValidTime() int64 {
// AppendableMinValidTime returns the minimum valid time for samples to be appended to the Head. // AppendableMinValidTime returns the minimum valid time for samples to be appended to the Head.
// Returns false if Head hasn't been initialized yet and the minimum time isn't known yet. // Returns false if Head hasn't been initialized yet and the minimum time isn't known yet.
func (h *Head) AppendableMinValidTime() (int64, bool) { func (h *Head) AppendableMinValidTime() (int64, bool) {
if h.MinTime() == math.MaxInt64 { if !h.initialized() {
return 0, false return 0, false
} }

View File

@ -5819,3 +5819,16 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
require.Equal(t, chunkenc.ValNone, it.Next()) require.Equal(t, chunkenc.ValNone, it.Next())
} }
} }
func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) {
// Use a chunk range of 1 here so that if we attempted to determine if the head
// was compactable using default values for min and max times, `Head.compactable()`
// would return true which is incorrect. This test verifies that we short-circuit
// the check when the head has not yet had any samples added.
head, _ := newTestHead(t, 1, wlog.CompressionNone, false)
defer func() {
require.NoError(t, head.Close())
}()
require.False(t, head.compactable())
}