diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index 4bc249880..809775ed7 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -272,6 +272,9 @@ func parse(args []string) error { if promql.StalenessDelta < 0 { return fmt.Errorf("negative staleness delta: %s", promql.StalenessDelta) } + // The staleness delta is also a reasonable head chunk timeout. Thus, we + // don't expose it as a separate flag but set it here. + cfg.storage.HeadChunkTimeout = promql.StalenessDelta if err := parsePrometheusURL(); err != nil { return err diff --git a/storage/local/series.go b/storage/local/series.go index 71ea7f284..261add291 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -25,10 +25,6 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) -const ( - headChunkTimeout = time.Hour // Close head chunk if not touched for that long. -) - // fingerprintSeriesPair pairs a fingerprint with a memorySeries pointer. type fingerprintSeriesPair struct { fp model.Fingerprint @@ -259,15 +255,15 @@ func (s *memorySeries) add(v model.SamplePair) (int, error) { } // maybeCloseHeadChunk closes the head chunk if it has not been touched for the -// duration of headChunkTimeout. It returns whether the head chunk was closed. -// If the head chunk is already closed, the method is a no-op and returns false. +// provided duration. It returns whether the head chunk was closed. If the head +// chunk is already closed, the method is a no-op and returns false. // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) maybeCloseHeadChunk() (bool, error) { +func (s *memorySeries) maybeCloseHeadChunk(timeout time.Duration) (bool, error) { if s.headChunkClosed { return false, nil } - if time.Now().Sub(s.lastTime.Time()) > headChunkTimeout { + if time.Now().Sub(s.lastTime.Time()) > timeout { s.headChunkClosed = true // Since we cannot modify the head chunk from now on, we // don't need to bother with cloning anymore. diff --git a/storage/local/storage.go b/storage/local/storage.go index fa9001702..f457fdf8a 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -164,6 +164,7 @@ type MemorySeriesStorage struct { logThrottlingStopped chan struct{} maxMemoryChunks int dropAfter time.Duration + headChunkTimeout time.Duration checkpointInterval time.Duration checkpointDirtySeriesLimit int @@ -199,6 +200,7 @@ type MemorySeriesStorageOptions struct { MaxChunksToPersist int // Max number of chunks waiting to be persisted. PersistenceStoragePath string // Location of persistence files. PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped. + HeadChunkTimeout time.Duration // Head chunks idle for at least that long may be closed. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint. Dirty bool // Force the storage to consider itself dirty on startup. @@ -222,9 +224,10 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage throttled: make(chan struct{}, 1), maxMemoryChunks: o.MemoryChunks, dropAfter: o.PersistenceRetentionPeriod, + headChunkTimeout: o.HeadChunkTimeout, checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, - archiveHighWatermark: model.Now().Add(-headChunkTimeout), + archiveHighWatermark: model.Now().Add(-o.HeadChunkTimeout), maxChunksToPersist: o.MaxChunksToPersist, @@ -1394,7 +1397,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries( defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc() - closed, err := series.maybeCloseHeadChunk() + closed, err := series.maybeCloseHeadChunk(s.headChunkTimeout) if err != nil { s.quarantineSeries(fp, series.metric, err) s.persistErrors.Inc() @@ -1421,7 +1424,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries( // Archive if all chunks are evicted. Also make sure the last sample has // an age of at least headChunkTimeout (which is very likely anyway). - if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { + if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > s.headChunkTimeout { s.fpToSeries.del(fp) s.numSeries.Dec() s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 591e3df37..f456991de 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -828,6 +828,7 @@ func TestLoop(t *testing.T) { MaxChunksToPersist: 1000000, PersistenceRetentionPeriod: 24 * 7 * time.Hour, PersistenceStoragePath: directory.Path(), + HeadChunkTimeout: 5 * time.Minute, CheckpointInterval: 250 * time.Millisecond, SyncStrategy: Adaptive, MinShrinkRatio: 0.1, @@ -1608,6 +1609,7 @@ func benchmarkFuzz(b *testing.B, encoding chunk.Encoding) { MaxChunksToPersist: 1000000, PersistenceRetentionPeriod: time.Hour, PersistenceStoragePath: directory.Path(), + HeadChunkTimeout: 5 * time.Minute, CheckpointInterval: time.Second, SyncStrategy: Adaptive, MinShrinkRatio: 0.1, diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index bdc26c555..dc5a4502d 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -49,6 +49,7 @@ func NewTestStorage(t testutil.T, encoding chunk.Encoding) (*MemorySeriesStorage MaxChunksToPersist: 1000000, PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging. PersistenceStoragePath: directory.Path(), + HeadChunkTimeout: 5 * time.Minute, CheckpointInterval: time.Hour, SyncStrategy: Adaptive, }