mirror of https://github.com/prometheus/prometheus
storage: Use staleness delta as head chunk timeout
Currently, if a series stops to exist, its head chunk will be kept open for an hour. That prevents it from being persisted. Which prevents it from being evicted. Which prevents the series from being archived. Most of the time, once no sample has been added to a series within the staleness limit, we can be pretty confident that this series will not receive samples anymore. The whole chain as described above can be started after 5m instead of 1h. In the relaxed case, this doesn't change a lot as the head chunk timeout is only checked during series maintenance, and usually, a series is only maintained every six hours. However, there is the typical scenario where a large service is deployed, the deoply turns out to be bad, and then it is deployed again within minutes, and quite quickly the number of time series has tripled. That's the point where the Prometheus server is stressed and switches (rightfully) into rushed mode. In that mode, time series are processed as quickly as possible, but all of that is in vein if all of those recently ended time series cannot be persisted yet for another hour. In that scenario, this change will help most, and it's exactly the scenario where help is most desperately needed.pull/2529/head
parent
6dbd779099
commit
96a303b348
|
@ -272,6 +272,9 @@ func parse(args []string) error {
|
|||
if promql.StalenessDelta < 0 {
|
||||
return fmt.Errorf("negative staleness delta: %s", promql.StalenessDelta)
|
||||
}
|
||||
// The staleness delta is also a reasonable head chunk timeout. Thus, we
|
||||
// don't expose it as a separate flag but set it here.
|
||||
cfg.storage.HeadChunkTimeout = promql.StalenessDelta
|
||||
|
||||
if err := parsePrometheusURL(); err != nil {
|
||||
return err
|
||||
|
|
|
@ -25,10 +25,6 @@ import (
|
|||
"github.com/prometheus/prometheus/storage/metric"
|
||||
)
|
||||
|
||||
const (
|
||||
headChunkTimeout = time.Hour // Close head chunk if not touched for that long.
|
||||
)
|
||||
|
||||
// fingerprintSeriesPair pairs a fingerprint with a memorySeries pointer.
|
||||
type fingerprintSeriesPair struct {
|
||||
fp model.Fingerprint
|
||||
|
@ -259,15 +255,15 @@ func (s *memorySeries) add(v model.SamplePair) (int, error) {
|
|||
}
|
||||
|
||||
// maybeCloseHeadChunk closes the head chunk if it has not been touched for the
|
||||
// duration of headChunkTimeout. It returns whether the head chunk was closed.
|
||||
// If the head chunk is already closed, the method is a no-op and returns false.
|
||||
// provided duration. It returns whether the head chunk was closed. If the head
|
||||
// chunk is already closed, the method is a no-op and returns false.
|
||||
//
|
||||
// The caller must have locked the fingerprint of the series.
|
||||
func (s *memorySeries) maybeCloseHeadChunk() (bool, error) {
|
||||
func (s *memorySeries) maybeCloseHeadChunk(timeout time.Duration) (bool, error) {
|
||||
if s.headChunkClosed {
|
||||
return false, nil
|
||||
}
|
||||
if time.Now().Sub(s.lastTime.Time()) > headChunkTimeout {
|
||||
if time.Now().Sub(s.lastTime.Time()) > timeout {
|
||||
s.headChunkClosed = true
|
||||
// Since we cannot modify the head chunk from now on, we
|
||||
// don't need to bother with cloning anymore.
|
||||
|
|
|
@ -164,6 +164,7 @@ type MemorySeriesStorage struct {
|
|||
logThrottlingStopped chan struct{}
|
||||
maxMemoryChunks int
|
||||
dropAfter time.Duration
|
||||
headChunkTimeout time.Duration
|
||||
checkpointInterval time.Duration
|
||||
checkpointDirtySeriesLimit int
|
||||
|
||||
|
@ -199,6 +200,7 @@ type MemorySeriesStorageOptions struct {
|
|||
MaxChunksToPersist int // Max number of chunks waiting to be persisted.
|
||||
PersistenceStoragePath string // Location of persistence files.
|
||||
PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped.
|
||||
HeadChunkTimeout time.Duration // Head chunks idle for at least that long may be closed.
|
||||
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
|
||||
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
|
||||
Dirty bool // Force the storage to consider itself dirty on startup.
|
||||
|
@ -222,9 +224,10 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage
|
|||
throttled: make(chan struct{}, 1),
|
||||
maxMemoryChunks: o.MemoryChunks,
|
||||
dropAfter: o.PersistenceRetentionPeriod,
|
||||
headChunkTimeout: o.HeadChunkTimeout,
|
||||
checkpointInterval: o.CheckpointInterval,
|
||||
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
|
||||
archiveHighWatermark: model.Now().Add(-headChunkTimeout),
|
||||
archiveHighWatermark: model.Now().Add(-o.HeadChunkTimeout),
|
||||
|
||||
maxChunksToPersist: o.MaxChunksToPersist,
|
||||
|
||||
|
@ -1394,7 +1397,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
|
|||
|
||||
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
|
||||
|
||||
closed, err := series.maybeCloseHeadChunk()
|
||||
closed, err := series.maybeCloseHeadChunk(s.headChunkTimeout)
|
||||
if err != nil {
|
||||
s.quarantineSeries(fp, series.metric, err)
|
||||
s.persistErrors.Inc()
|
||||
|
@ -1421,7 +1424,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
|
|||
|
||||
// Archive if all chunks are evicted. Also make sure the last sample has
|
||||
// an age of at least headChunkTimeout (which is very likely anyway).
|
||||
if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout {
|
||||
if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > s.headChunkTimeout {
|
||||
s.fpToSeries.del(fp)
|
||||
s.numSeries.Dec()
|
||||
s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime)
|
||||
|
|
|
@ -828,6 +828,7 @@ func TestLoop(t *testing.T) {
|
|||
MaxChunksToPersist: 1000000,
|
||||
PersistenceRetentionPeriod: 24 * 7 * time.Hour,
|
||||
PersistenceStoragePath: directory.Path(),
|
||||
HeadChunkTimeout: 5 * time.Minute,
|
||||
CheckpointInterval: 250 * time.Millisecond,
|
||||
SyncStrategy: Adaptive,
|
||||
MinShrinkRatio: 0.1,
|
||||
|
@ -1608,6 +1609,7 @@ func benchmarkFuzz(b *testing.B, encoding chunk.Encoding) {
|
|||
MaxChunksToPersist: 1000000,
|
||||
PersistenceRetentionPeriod: time.Hour,
|
||||
PersistenceStoragePath: directory.Path(),
|
||||
HeadChunkTimeout: 5 * time.Minute,
|
||||
CheckpointInterval: time.Second,
|
||||
SyncStrategy: Adaptive,
|
||||
MinShrinkRatio: 0.1,
|
||||
|
|
|
@ -49,6 +49,7 @@ func NewTestStorage(t testutil.T, encoding chunk.Encoding) (*MemorySeriesStorage
|
|||
MaxChunksToPersist: 1000000,
|
||||
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
|
||||
PersistenceStoragePath: directory.Path(),
|
||||
HeadChunkTimeout: 5 * time.Minute,
|
||||
CheckpointInterval: time.Hour,
|
||||
SyncStrategy: Adaptive,
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue