Regard in-memory series as new.

This commit ensures that series that exist only in-memory and not
on-disk are not regarded as too old for operation exclusion.
pull/311/head
Matt T. Proud 2013-06-21 10:16:41 +02:00
parent 81c406630a
commit 2d5de99fbf
2 changed files with 32 additions and 7 deletions

View File

@ -14,11 +14,12 @@
package metric package metric
import ( import (
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/utility"
"sort" "sort"
"sync" "sync"
"time" "time"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/utility"
) )
// Assuming sample rate of 1 / 15Hz, this allows for one hour's worth of // Assuming sample rate of 1 / 15Hz, this allows for one hour's worth of
@ -370,6 +371,15 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(f *model.Fingerprint) (mod
return metric, nil return metric, nil
} }
func (s *memorySeriesStorage) HasFingerprint(f *model.Fingerprint) bool {
s.RLock()
defer s.RUnlock()
_, has := s.fingerprintToSeries[*f]
return has
}
func (s *memorySeriesStorage) CloneSamples(f *model.Fingerprint) model.Values { func (s *memorySeriesStorage) CloneSamples(f *model.Fingerprint) model.Values {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()

View File

@ -309,20 +309,35 @@ func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, e
// BUG(julius): Make this configurable by query layer. // BUG(julius): Make this configurable by query layer.
i = i.Add(-stalenessLimit) i = i.Add(-stalenessLimit)
wm, ok := t.wmCache.Get(f) wm, cacheHit := t.wmCache.Get(f)
if !ok { if !cacheHit {
value := &dto.MetricHighWatermark{} value := &dto.MetricHighWatermark{}
present, err := t.DiskStorage.MetricHighWatermarks.Get(f.ToDTO(), value) diskHit, err := t.DiskStorage.MetricHighWatermarks.Get(f.ToDTO(), value)
if err != nil { if err != nil {
return false, err return false, err
} }
if present {
if diskHit {
wmTime := time.Unix(*value.Timestamp, 0).UTC() wmTime := time.Unix(*value.Timestamp, 0).UTC()
t.wmCache.Set(f, &Watermarks{High: wmTime}) t.wmCache.Set(f, &Watermarks{High: wmTime})
return wmTime.Before(i), nil return wmTime.Before(i), nil
} }
return true, nil
if !t.memoryArena.HasFingerprint(f) {
return true, nil
}
samples := t.memoryArena.CloneSamples(f)
if len(samples) == 0 {
return true, nil
}
newest := samples[0].Timestamp
t.wmCache.Set(f, &Watermarks{High: newest})
return false, nil
} }
return wm.High.Before(i), nil return wm.High.Before(i), nil
} }