Browse Source

Merge pull request #286 from prometheus/optimize/avoid-stale-series

Use LRU cache to avoid querying stale timeseries
pull/271/merge
Matt T. Proud 12 years ago
parent
commit
757ab938db
  1. 2
      storage/metric/helpers_test.go
  2. 2
      storage/metric/interface_test.go
  3. 15
      storage/metric/leveldb.go
  4. 14
      storage/metric/memory.go
  5. 2
      storage/metric/memory_test.go
  6. 6
      storage/metric/rule_integration_test.go
  7. 2
      storage/metric/stochastic_test.go
  8. 55
      storage/metric/tiered.go
  9. 2
      storage/metric/view.go
  10. 2
      storage/metric/watermark.go

2
storage/metric/helpers_test.go

@ -65,7 +65,7 @@ func buildLevelDBTestPersistence(name string, f func(p MetricPersistence, t test
func buildMemoryTestPersistence(f func(p MetricPersistence, t test.Tester)) func(t test.Tester) {
return func(t test.Tester) {
p := NewMemorySeriesStorage()
p := NewMemorySeriesStorage(MemorySeriesOptions{})
defer p.Close()

2
storage/metric/interface_test.go

@ -19,5 +19,5 @@ import (
func TestInterfaceAdherence(t *testing.T) {
var _ MetricPersistence = &LevelDBMetricPersistence{}
var _ MetricPersistence = NewMemorySeriesStorage()
var _ MetricPersistence = NewMemorySeriesStorage(MemorySeriesOptions{})
}

15
storage/metric/leveldb.go

@ -492,22 +492,19 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
mutationCount := 0
for fingerprint, samples := range groups {
key := &dto.Fingerprint{}
keyEncoded := coding.NewPBEncoder(fingerprint.ToDTO())
value := &dto.MetricHighWatermark{}
raw := []byte{}
newestSampleTimestamp := samples[len(samples)-1].Timestamp
keyEncoded := coding.NewPBEncoder(key)
key.Signature = proto.String(fingerprint.ToRowKey())
raw, err = l.MetricHighWatermarks.Get(keyEncoded)
raw, err := l.MetricHighWatermarks.Get(keyEncoded)
if err != nil {
return
return err
}
if raw != nil {
err = proto.Unmarshal(raw, value)
if err != nil {
return
return err
}
if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) {
@ -521,10 +518,10 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger
err = l.MetricHighWatermarks.Commit(batch)
if err != nil {
return
return err
}
return
return nil
}
func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) {

14
storage/metric/memory.go

@ -151,11 +151,18 @@ func newStream(metric model.Metric) *stream {
type memorySeriesStorage struct {
sync.RWMutex
wmCache *WatermarkCache
fingerprintToSeries map[model.Fingerprint]*stream
labelPairToFingerprints map[model.LabelPair]model.Fingerprints
labelNameToFingerprints map[model.LabelName]model.Fingerprints
}
type MemorySeriesOptions struct {
// If provided, this WatermarkCache will be updated for any samples that are
// appended to the memorySeriesStorage.
WatermarkCache *WatermarkCache
}
func (s *memorySeriesStorage) AppendSamples(samples model.Samples) error {
for _, sample := range samples {
s.AppendSample(sample)
@ -172,6 +179,10 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
fingerprint := model.NewFingerprintFromMetric(metric)
series, ok := s.fingerprintToSeries[*fingerprint]
if s.wmCache != nil {
s.wmCache.Set(fingerprint, &Watermarks{High: sample.Timestamp})
}
if !ok {
series = newStream(metric)
s.fingerprintToSeries[*fingerprint] = series
@ -355,10 +366,11 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (v
return
}
func NewMemorySeriesStorage() *memorySeriesStorage {
func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage {
return &memorySeriesStorage{
fingerprintToSeries: make(map[model.Fingerprint]*stream),
labelPairToFingerprints: make(map[model.LabelPair]model.Fingerprints),
labelNameToFingerprints: make(map[model.LabelName]model.Fingerprints),
wmCache: o.WatermarkCache,
}
}

2
storage/metric/memory_test.go

@ -48,7 +48,7 @@ func BenchmarkStreamAdd(b *testing.B) {
func benchmarkAppendSample(b *testing.B, labels int) {
b.StopTimer()
s := NewMemorySeriesStorage()
s := NewMemorySeriesStorage(MemorySeriesOptions{})
metric := model.Metric{}

6
storage/metric/rule_integration_test.go

@ -896,7 +896,7 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer
func testMemoryGetValueAtTime(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) {
return NewMemorySeriesStorage(), test.NilCloser
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}
GetValueAtTimeTests(persistenceMaker, t)
@ -924,7 +924,7 @@ func BenchmarkMemoryGetBoundaryValues(b *testing.B) {
func testMemoryGetRangeValues(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) {
return NewMemorySeriesStorage(), test.NilCloser
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}
GetRangeValuesTests(persistenceMaker, false, t)
@ -932,7 +932,7 @@ func testMemoryGetRangeValues(t test.Tester) {
func testMemoryGetBoundaryValues(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) {
return NewMemorySeriesStorage(), test.NilCloser
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}
GetRangeValuesTests(persistenceMaker, true, t)

2
storage/metric/stochastic_test.go

@ -644,7 +644,7 @@ func BenchmarkMemoryAppendSampleAsPureSingleEntityAppend(b *testing.B) {
func testMemoryStochastic(t test.Tester) {
persistenceMaker := func() (MetricPersistence, test.Closer) {
return NewMemorySeriesStorage(), test.NilCloser
return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser
}
StochasticTests(persistenceMaker, t)

55
storage/metric/tiered.go

@ -19,6 +19,8 @@ import (
"sort"
"time"
"code.google.com/p/goprotobuf/proto"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/coding"
@ -62,6 +64,13 @@ const (
tieredStorageStopping
)
const (
// Ignore timeseries in queries that are more stale than this limit.
stalenessLimit = time.Minute * 5
// Size of the watermarks cache (used in determining timeseries freshness).
wmCacheSizeBytes = 5 * 1024 * 1024
)
// TieredStorage both persists samples and generates materialized views for
// queries.
type TieredStorage struct {
@ -85,6 +94,8 @@ type TieredStorage struct {
memorySemaphore chan bool
diskSemaphore chan bool
wmCache *WatermarkCache
}
// viewJob encapsulates a request to extract sample values from the datastore.
@ -107,17 +118,22 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
return nil, err
}
wmCache := NewWatermarkCache(wmCacheSizeBytes)
memOptions := MemorySeriesOptions{WatermarkCache: wmCache}
s := &TieredStorage{
appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth),
DiskStorage: diskStorage,
draining: make(chan chan<- bool),
flushMemoryInterval: flushMemoryInterval,
memoryArena: NewMemorySeriesStorage(),
memoryArena: NewMemorySeriesStorage(memOptions),
memoryTTL: memoryTTL,
viewQueue: make(chan viewJob, viewQueueDepth),
diskSemaphore: make(chan bool, tieredDiskSemaphores),
memorySemaphore: make(chan bool, tieredMemorySemaphores),
wmCache: wmCache,
}
for i := 0; i < tieredDiskSemaphores; i++ {
@ -315,10 +331,38 @@ func (t *TieredStorage) Close() {
// get flushed.
close(t.appendToDiskQueue)
close(t.viewQueue)
t.wmCache.Clear()
t.state = tieredStorageStopping
}
func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, error) {
// BUG(julius): Make this configurable by query layer.
i = i.Add(-stalenessLimit)
wm, ok := t.wmCache.Get(f)
if !ok {
rowKey := coding.NewPBEncoder(f.ToDTO())
raw, err := t.DiskStorage.MetricHighWatermarks.Get(rowKey)
if err != nil {
return false, err
}
if raw != nil {
value := &dto.MetricHighWatermark{}
err = proto.Unmarshal(raw, value)
if err != nil {
return false, err
}
wmTime := time.Unix(*value.Timestamp, 0).UTC()
t.wmCache.Set(f, &Watermarks{High: wmTime})
return wmTime.Before(i), nil
}
return true, nil
}
return wm.High.Before(i), nil
}
func (t *TieredStorage) renderView(viewJob viewJob) {
// Telemetry.
var err error
@ -342,6 +386,15 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start()
for _, scanJob := range scans {
old, err := t.seriesTooOld(scanJob.fingerprint, *scanJob.operations[0].CurrentTime())
if err != nil {
log.Printf("Error getting watermark from cache for %s: %s", scanJob.fingerprint, err)
continue
}
if old {
continue
}
var seriesFrontier *seriesFrontier = nil
var seriesPresent = true

2
storage/metric/view.go

@ -110,5 +110,5 @@ func (v view) appendSamples(fingerprint *model.Fingerprint, samples model.Values
}
func newView() view {
return view{NewMemorySeriesStorage()}
return view{NewMemorySeriesStorage(MemorySeriesOptions{})}
}

2
storage/metric/watermark.go

@ -119,7 +119,7 @@ func (lru *WatermarkCache) Clear() {
}
func (lru *WatermarkCache) updateInplace(e *list.Element, w *Watermarks) {
e.Value = w
e.Value.(*entry).watermarks = w
lru.moveToFront(e)
lru.checkCapacity()
}

Loading…
Cancel
Save