Merge pull request #2529 from prometheus/beorn7/storage3

storage: Use staleness delta as head chunk timeout
pull/2527/head
Björn Rabenstein 8 years ago committed by GitHub
commit e1a84b6256

@ -272,6 +272,9 @@ func parse(args []string) error {
if promql.StalenessDelta < 0 { if promql.StalenessDelta < 0 {
return fmt.Errorf("negative staleness delta: %s", promql.StalenessDelta) return fmt.Errorf("negative staleness delta: %s", promql.StalenessDelta)
} }
// The staleness delta is also a reasonable head chunk timeout. Thus, we
// don't expose it as a separate flag but set it here.
cfg.storage.HeadChunkTimeout = promql.StalenessDelta
if err := parsePrometheusURL(); err != nil { if err := parsePrometheusURL(); err != nil {
return err return err

@ -25,10 +25,6 @@ import (
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
const (
headChunkTimeout = time.Hour // Close head chunk if not touched for that long.
)
// fingerprintSeriesPair pairs a fingerprint with a memorySeries pointer. // fingerprintSeriesPair pairs a fingerprint with a memorySeries pointer.
type fingerprintSeriesPair struct { type fingerprintSeriesPair struct {
fp model.Fingerprint fp model.Fingerprint
@ -259,15 +255,15 @@ func (s *memorySeries) add(v model.SamplePair) (int, error) {
} }
// maybeCloseHeadChunk closes the head chunk if it has not been touched for the // maybeCloseHeadChunk closes the head chunk if it has not been touched for the
// duration of headChunkTimeout. It returns whether the head chunk was closed. // provided duration. It returns whether the head chunk was closed. If the head
// If the head chunk is already closed, the method is a no-op and returns false. // chunk is already closed, the method is a no-op and returns false.
// //
// The caller must have locked the fingerprint of the series. // The caller must have locked the fingerprint of the series.
func (s *memorySeries) maybeCloseHeadChunk() (bool, error) { func (s *memorySeries) maybeCloseHeadChunk(timeout time.Duration) (bool, error) {
if s.headChunkClosed { if s.headChunkClosed {
return false, nil return false, nil
} }
if time.Now().Sub(s.lastTime.Time()) > headChunkTimeout { if time.Now().Sub(s.lastTime.Time()) > timeout {
s.headChunkClosed = true s.headChunkClosed = true
// Since we cannot modify the head chunk from now on, we // Since we cannot modify the head chunk from now on, we
// don't need to bother with cloning anymore. // don't need to bother with cloning anymore.

@ -164,6 +164,7 @@ type MemorySeriesStorage struct {
logThrottlingStopped chan struct{} logThrottlingStopped chan struct{}
maxMemoryChunks int maxMemoryChunks int
dropAfter time.Duration dropAfter time.Duration
headChunkTimeout time.Duration
checkpointInterval time.Duration checkpointInterval time.Duration
checkpointDirtySeriesLimit int checkpointDirtySeriesLimit int
@ -199,6 +200,7 @@ type MemorySeriesStorageOptions struct {
MaxChunksToPersist int // Max number of chunks waiting to be persisted. MaxChunksToPersist int // Max number of chunks waiting to be persisted.
PersistenceStoragePath string // Location of persistence files. PersistenceStoragePath string // Location of persistence files.
PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped. PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped.
HeadChunkTimeout time.Duration // Head chunks idle for at least that long may be closed.
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint. CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
Dirty bool // Force the storage to consider itself dirty on startup. Dirty bool // Force the storage to consider itself dirty on startup.
@ -222,9 +224,10 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage
throttled: make(chan struct{}, 1), throttled: make(chan struct{}, 1),
maxMemoryChunks: o.MemoryChunks, maxMemoryChunks: o.MemoryChunks,
dropAfter: o.PersistenceRetentionPeriod, dropAfter: o.PersistenceRetentionPeriod,
headChunkTimeout: o.HeadChunkTimeout,
checkpointInterval: o.CheckpointInterval, checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
archiveHighWatermark: model.Now().Add(-headChunkTimeout), archiveHighWatermark: model.Now().Add(-o.HeadChunkTimeout),
maxChunksToPersist: o.MaxChunksToPersist, maxChunksToPersist: o.MaxChunksToPersist,
@ -1394,7 +1397,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc() defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
closed, err := series.maybeCloseHeadChunk() closed, err := series.maybeCloseHeadChunk(s.headChunkTimeout)
if err != nil { if err != nil {
s.quarantineSeries(fp, series.metric, err) s.quarantineSeries(fp, series.metric, err)
s.persistErrors.Inc() s.persistErrors.Inc()
@ -1421,7 +1424,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
// Archive if all chunks are evicted. Also make sure the last sample has // Archive if all chunks are evicted. Also make sure the last sample has
// an age of at least headChunkTimeout (which is very likely anyway). // an age of at least headChunkTimeout (which is very likely anyway).
if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > s.headChunkTimeout {
s.fpToSeries.del(fp) s.fpToSeries.del(fp)
s.numSeries.Dec() s.numSeries.Dec()
s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime) s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime)

@ -828,6 +828,7 @@ func TestLoop(t *testing.T) {
MaxChunksToPersist: 1000000, MaxChunksToPersist: 1000000,
PersistenceRetentionPeriod: 24 * 7 * time.Hour, PersistenceRetentionPeriod: 24 * 7 * time.Hour,
PersistenceStoragePath: directory.Path(), PersistenceStoragePath: directory.Path(),
HeadChunkTimeout: 5 * time.Minute,
CheckpointInterval: 250 * time.Millisecond, CheckpointInterval: 250 * time.Millisecond,
SyncStrategy: Adaptive, SyncStrategy: Adaptive,
MinShrinkRatio: 0.1, MinShrinkRatio: 0.1,
@ -1608,6 +1609,7 @@ func benchmarkFuzz(b *testing.B, encoding chunk.Encoding) {
MaxChunksToPersist: 1000000, MaxChunksToPersist: 1000000,
PersistenceRetentionPeriod: time.Hour, PersistenceRetentionPeriod: time.Hour,
PersistenceStoragePath: directory.Path(), PersistenceStoragePath: directory.Path(),
HeadChunkTimeout: 5 * time.Minute,
CheckpointInterval: time.Second, CheckpointInterval: time.Second,
SyncStrategy: Adaptive, SyncStrategy: Adaptive,
MinShrinkRatio: 0.1, MinShrinkRatio: 0.1,

@ -49,6 +49,7 @@ func NewTestStorage(t testutil.T, encoding chunk.Encoding) (*MemorySeriesStorage
MaxChunksToPersist: 1000000, MaxChunksToPersist: 1000000,
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging. PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
PersistenceStoragePath: directory.Path(), PersistenceStoragePath: directory.Path(),
HeadChunkTimeout: 5 * time.Minute,
CheckpointInterval: time.Hour, CheckpointInterval: time.Hour,
SyncStrategy: Adaptive, SyncStrategy: Adaptive,
} }

Loading…
Cancel
Save