From f98853d7b7a6038099f434e94db611ab6734141f Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 6 Jun 2013 18:14:47 +0200 Subject: [PATCH 1/3] Fix type error in watermark list handling. --- storage/metric/watermark.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/metric/watermark.go b/storage/metric/watermark.go index c58fb0ce0..040619ff2 100644 --- a/storage/metric/watermark.go +++ b/storage/metric/watermark.go @@ -119,7 +119,7 @@ func (lru *WatermarkCache) Clear() { } func (lru *WatermarkCache) updateInplace(e *list.Element, w *Watermarks) { - e.Value = w + e.Value.(*entry).watermarks = w lru.moveToFront(e) lru.checkCapacity() } From 84741b227db8ab54a1f2f3a3361d336f4cb57220 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 6 Jun 2013 18:16:22 +0200 Subject: [PATCH 2/3] Use LRU cache to avoid querying stale series. --- storage/metric/helpers_test.go | 2 +- storage/metric/interface_test.go | 2 +- storage/metric/memory.go | 14 ++++++- storage/metric/memory_test.go | 2 +- storage/metric/rule_integration_test.go | 6 +-- storage/metric/stochastic_test.go | 2 +- storage/metric/tiered.go | 55 ++++++++++++++++++++++++- storage/metric/view.go | 2 +- 8 files changed, 75 insertions(+), 10 deletions(-) diff --git a/storage/metric/helpers_test.go b/storage/metric/helpers_test.go index 8be36cdf8..2902bde51 100644 --- a/storage/metric/helpers_test.go +++ b/storage/metric/helpers_test.go @@ -65,7 +65,7 @@ func buildLevelDBTestPersistence(name string, f func(p MetricPersistence, t test func buildMemoryTestPersistence(f func(p MetricPersistence, t test.Tester)) func(t test.Tester) { return func(t test.Tester) { - p := NewMemorySeriesStorage() + p := NewMemorySeriesStorage(MemorySeriesOptions{}) defer p.Close() diff --git a/storage/metric/interface_test.go b/storage/metric/interface_test.go index c3979126e..eb7dd29c9 100644 --- a/storage/metric/interface_test.go +++ b/storage/metric/interface_test.go @@ -19,5 +19,5 @@ import ( func TestInterfaceAdherence(t *testing.T) { var _ MetricPersistence = &LevelDBMetricPersistence{} - var _ MetricPersistence = NewMemorySeriesStorage() + var _ MetricPersistence = NewMemorySeriesStorage(MemorySeriesOptions{}) } diff --git a/storage/metric/memory.go b/storage/metric/memory.go index 15d06764a..1a4af813d 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -151,11 +151,18 @@ func newStream(metric model.Metric) *stream { type memorySeriesStorage struct { sync.RWMutex + wmCache *WatermarkCache fingerprintToSeries map[model.Fingerprint]*stream labelPairToFingerprints map[model.LabelPair]model.Fingerprints labelNameToFingerprints map[model.LabelName]model.Fingerprints } +type MemorySeriesOptions struct { + // If provided, this WatermarkCache will be updated for any samples that are + // appended to the memorySeriesStorage. + WatermarkCache *WatermarkCache +} + func (s *memorySeriesStorage) AppendSamples(samples model.Samples) error { for _, sample := range samples { s.AppendSample(sample) @@ -172,6 +179,10 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error { fingerprint := model.NewFingerprintFromMetric(metric) series, ok := s.fingerprintToSeries[*fingerprint] + if s.wmCache != nil { + s.wmCache.Set(fingerprint, &Watermarks{High: sample.Timestamp}) + } + if !ok { series = newStream(metric) s.fingerprintToSeries[*fingerprint] = series @@ -355,10 +366,11 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (v return } -func NewMemorySeriesStorage() *memorySeriesStorage { +func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage { return &memorySeriesStorage{ fingerprintToSeries: make(map[model.Fingerprint]*stream), labelPairToFingerprints: make(map[model.LabelPair]model.Fingerprints), labelNameToFingerprints: make(map[model.LabelName]model.Fingerprints), + wmCache: o.WatermarkCache, } } diff --git a/storage/metric/memory_test.go b/storage/metric/memory_test.go index 34514826d..402c47dd5 100644 --- a/storage/metric/memory_test.go +++ b/storage/metric/memory_test.go @@ -48,7 +48,7 @@ func BenchmarkStreamAdd(b *testing.B) { func benchmarkAppendSample(b *testing.B, labels int) { b.StopTimer() - s := NewMemorySeriesStorage() + s := NewMemorySeriesStorage(MemorySeriesOptions{}) metric := model.Metric{} diff --git a/storage/metric/rule_integration_test.go b/storage/metric/rule_integration_test.go index bd8655756..0673c45ba 100644 --- a/storage/metric/rule_integration_test.go +++ b/storage/metric/rule_integration_test.go @@ -896,7 +896,7 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer func testMemoryGetValueAtTime(t test.Tester) { persistenceMaker := func() (MetricPersistence, test.Closer) { - return NewMemorySeriesStorage(), test.NilCloser + return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser } GetValueAtTimeTests(persistenceMaker, t) @@ -924,7 +924,7 @@ func BenchmarkMemoryGetBoundaryValues(b *testing.B) { func testMemoryGetRangeValues(t test.Tester) { persistenceMaker := func() (MetricPersistence, test.Closer) { - return NewMemorySeriesStorage(), test.NilCloser + return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser } GetRangeValuesTests(persistenceMaker, false, t) @@ -932,7 +932,7 @@ func testMemoryGetRangeValues(t test.Tester) { func testMemoryGetBoundaryValues(t test.Tester) { persistenceMaker := func() (MetricPersistence, test.Closer) { - return NewMemorySeriesStorage(), test.NilCloser + return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser } GetRangeValuesTests(persistenceMaker, true, t) diff --git a/storage/metric/stochastic_test.go b/storage/metric/stochastic_test.go index 666be10cc..58db62339 100644 --- a/storage/metric/stochastic_test.go +++ b/storage/metric/stochastic_test.go @@ -644,7 +644,7 @@ func BenchmarkMemoryAppendSampleAsPureSingleEntityAppend(b *testing.B) { func testMemoryStochastic(t test.Tester) { persistenceMaker := func() (MetricPersistence, test.Closer) { - return NewMemorySeriesStorage(), test.NilCloser + return NewMemorySeriesStorage(MemorySeriesOptions{}), test.NilCloser } StochasticTests(persistenceMaker, t) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 6d8e45187..3f86f4cee 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -19,6 +19,8 @@ import ( "sort" "time" + "code.google.com/p/goprotobuf/proto" + dto "github.com/prometheus/prometheus/model/generated" "github.com/prometheus/prometheus/coding" @@ -62,6 +64,13 @@ const ( tieredStorageStopping ) +const ( + // Ignore timeseries in queries that are more stale than this limit. + stalenessLimit = time.Minute * 5 + // Size of the watermarks cache (used in determining timeseries freshness). + wmCacheSizeBytes = 5 * 1024 * 1024 +) + // TieredStorage both persists samples and generates materialized views for // queries. type TieredStorage struct { @@ -85,6 +94,8 @@ type TieredStorage struct { memorySemaphore chan bool diskSemaphore chan bool + + wmCache *WatermarkCache } // viewJob encapsulates a request to extract sample values from the datastore. @@ -107,17 +118,22 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn return nil, err } + wmCache := NewWatermarkCache(wmCacheSizeBytes) + memOptions := MemorySeriesOptions{WatermarkCache: wmCache} + s := &TieredStorage{ appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth), DiskStorage: diskStorage, draining: make(chan chan<- bool), flushMemoryInterval: flushMemoryInterval, - memoryArena: NewMemorySeriesStorage(), + memoryArena: NewMemorySeriesStorage(memOptions), memoryTTL: memoryTTL, viewQueue: make(chan viewJob, viewQueueDepth), diskSemaphore: make(chan bool, tieredDiskSemaphores), memorySemaphore: make(chan bool, tieredMemorySemaphores), + + wmCache: wmCache, } for i := 0; i < tieredDiskSemaphores; i++ { @@ -315,10 +331,38 @@ func (t *TieredStorage) Close() { // get flushed. close(t.appendToDiskQueue) close(t.viewQueue) + t.wmCache.Clear() t.state = tieredStorageStopping } +func (t *TieredStorage) seriesTooOld(f *model.Fingerprint, i time.Time) (bool, error) { + // BUG(julius): Make this configurable by query layer. + i = i.Add(-stalenessLimit) + + wm, ok := t.wmCache.Get(f) + if !ok { + rowKey := coding.NewPBEncoder(f.ToDTO()) + raw, err := t.DiskStorage.MetricHighWatermarks.Get(rowKey) + if err != nil { + return false, err + } + if raw != nil { + value := &dto.MetricHighWatermark{} + err = proto.Unmarshal(raw, value) + if err != nil { + return false, err + } + + wmTime := time.Unix(*value.Timestamp, 0).UTC() + t.wmCache.Set(f, &Watermarks{High: wmTime}) + return wmTime.Before(i), nil + } + return true, nil + } + return wm.High.Before(i), nil +} + func (t *TieredStorage) renderView(viewJob viewJob) { // Telemetry. var err error @@ -342,6 +386,15 @@ func (t *TieredStorage) renderView(viewJob viewJob) { extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start() for _, scanJob := range scans { + old, err := t.seriesTooOld(scanJob.fingerprint, *scanJob.operations[0].CurrentTime()) + if err != nil { + log.Printf("Error getting watermark from cache for %s: %s", scanJob.fingerprint, err) + continue + } + if old { + continue + } + var seriesFrontier *seriesFrontier = nil var seriesPresent = true diff --git a/storage/metric/view.go b/storage/metric/view.go index 1330e6eac..93fc45d9c 100644 --- a/storage/metric/view.go +++ b/storage/metric/view.go @@ -110,5 +110,5 @@ func (v view) appendSamples(fingerprint *model.Fingerprint, samples model.Values } func newView() view { - return view{NewMemorySeriesStorage()} + return view{NewMemorySeriesStorage(MemorySeriesOptions{})} } From 7b9ee950307e7f2e9d9590f8ec2161e70a0970c4 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 6 Jun 2013 18:16:28 +0200 Subject: [PATCH 3/3] Minor LevelDB watermark handling cleanups. --- storage/metric/leveldb.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index 9d374f93c..d4bfc6cb3 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -492,22 +492,19 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger mutationCount := 0 for fingerprint, samples := range groups { - key := &dto.Fingerprint{} + keyEncoded := coding.NewPBEncoder(fingerprint.ToDTO()) value := &dto.MetricHighWatermark{} - raw := []byte{} newestSampleTimestamp := samples[len(samples)-1].Timestamp - keyEncoded := coding.NewPBEncoder(key) - key.Signature = proto.String(fingerprint.ToRowKey()) - raw, err = l.MetricHighWatermarks.Get(keyEncoded) + raw, err := l.MetricHighWatermarks.Get(keyEncoded) if err != nil { - return + return err } if raw != nil { err = proto.Unmarshal(raw, value) if err != nil { - return + return err } if newestSampleTimestamp.Before(time.Unix(*value.Timestamp, 0)) { @@ -521,10 +518,10 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[model.Finger err = l.MetricHighWatermarks.Commit(batch) if err != nil { - return + return err } - return + return nil } func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) {