From bedc0eda1f25e61db2d78407ca67708ce1f4dd28 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sun, 18 Sep 2016 22:58:39 +0100 Subject: [PATCH 1/4] Added BenchmarkQueryRange --- storage/local/storage_test.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index cd9e866f5..a19f753ae 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -19,6 +19,7 @@ import ( "math" "math/rand" "os" + "strconv" "testing" "testing/quick" "time" @@ -469,6 +470,38 @@ func BenchmarkLabelMatching(b *testing.B) { b.StopTimer() } +func BenchmarkQueryRange(b *testing.B) { + now := model.Now() + insertStart := now.Add(-2 * time.Hour) + + s, closer := NewTestStorage(b, 2) + defer closer.Close() + + // Stop maintenance loop to prevent actual purging. + close(s.loopStopping) + <-s.loopStopped + <-s.logThrottlingStopped + // Recreate channel to avoid panic when we really shut down. + s.loopStopping = make(chan struct{}) + + for i := 0; i < 8192; i++ { + s.Append(&model.Sample{ + Metric: model.Metric{"__name__": model.LabelValue(strconv.Itoa(i)), "job": "test"}, + Timestamp: insertStart, + Value: 1, + }) + } + s.WaitForIndexing() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + lm, _ := metric.NewLabelMatcher(metric.Equal, "job", "test") + for pb.Next() { + s.QueryRange(context.Background(), insertStart, now, lm) + } + }) +} + func TestRetentionCutoff(t *testing.T) { now := model.Now() insertStart := now.Add(-2 * time.Hour) From c048a0cde874bb6ac037e6d476068b4863739926 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sun, 18 Sep 2016 11:03:00 +0100 Subject: [PATCH 2/4] Add metrics to result after checking all matchers Should be marginally faster and somewhat more GC friendly --- storage/local/storage.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index a00da8b8a..93367fe6d 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -631,19 +631,22 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers( } result := map[model.Fingerprint]metric.Metric{} +FP_LOOP: for fp := range remainingFPs { s.fpLocker.Lock(fp) - if met, _, ok := s.metricForRange(fp, from, through); ok { - result[fp] = metric.Metric{Metric: met} - } + met, _, ok := s.metricForRange(fp, from, through) s.fpLocker.Unlock(fp) - } - for _, m := range matchers[matcherIdx:] { - for fp, met := range result { - if !m.Match(met.Metric[m.Name]) { - delete(result, fp) + + if !ok { + continue FP_LOOP + } + + for _, m := range matchers[matcherIdx:] { + if !m.Match(met[m.Name]) { + continue FP_LOOP } } + result[fp] = metric.Metric{Metric: met} } return result, nil } From 4978a65495e8e055e9869e97feceb2896adac5fb Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sun, 18 Sep 2016 11:37:17 +0100 Subject: [PATCH 3/4] Extract initial FP candidate build logic into candidateFPsForLabelMatchers method No functional changes otherwise --- storage/local/storage.go | 50 +++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index 93367fe6d..78eb557c7 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -563,43 +563,43 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers( return metrics, nil } -func (s *MemorySeriesStorage) metricsForLabelMatchers( - from, through model.Time, +// returns candidate FPs for given matchers and remaining matchers to be checked +func (s *MemorySeriesStorage) candidateFPsForLabelMatchers( matchers ...*metric.LabelMatcher, -) (map[model.Fingerprint]metric.Metric, error) { +) (map[model.Fingerprint]struct{}, []*metric.LabelMatcher, error) { sort.Sort(metric.LabelMatchers(matchers)) if len(matchers) == 0 || matchers[0].MatchesEmptyString() { // No matchers at all or even the best matcher matches the empty string. - return nil, nil + return nil, nil, nil } var ( matcherIdx int - remainingFPs map[model.Fingerprint]struct{} + candidateFPs map[model.Fingerprint]struct{} ) // Equal matchers. - for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpEqualMatchThreshold); matcherIdx++ { + for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpEqualMatchThreshold); matcherIdx++ { m := matchers[matcherIdx] if m.Type != metric.Equal || m.MatchesEmptyString() { break } - remainingFPs = s.fingerprintsForLabelPair( + candidateFPs = s.fingerprintsForLabelPair( model.LabelPair{ Name: m.Name, Value: m.Value, }, nil, - remainingFPs, + candidateFPs, ) - if len(remainingFPs) == 0 { - return nil, nil + if len(candidateFPs) == 0 { + return nil, nil, nil } } // Other matchers. - for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpOtherMatchThreshold); matcherIdx++ { + for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpOtherMatchThreshold); matcherIdx++ { m := matchers[matcherIdx] if m.MatchesEmptyString() { break @@ -607,11 +607,11 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers( lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name) if err != nil { - return nil, err + return nil, nil, err } lvs = m.Filter(lvs) if len(lvs) == 0 { - return nil, nil + return nil, nil, nil } fps := map[model.Fingerprint]struct{}{} for _, lv := range lvs { @@ -621,18 +621,30 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers( Value: lv, }, fps, - remainingFPs, + candidateFPs, ) } - remainingFPs = fps - if len(remainingFPs) == 0 { - return nil, nil + candidateFPs = fps + if len(candidateFPs) == 0 { + return nil, nil, nil } } + return candidateFPs, matchers[matcherIdx:], nil +} + +func (s *MemorySeriesStorage) metricsForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) (map[model.Fingerprint]metric.Metric, error) { + + candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) + if err != nil { + return nil, err + } result := map[model.Fingerprint]metric.Metric{} FP_LOOP: - for fp := range remainingFPs { + for fp := range candidateFPs { s.fpLocker.Lock(fp) met, _, ok := s.metricForRange(fp, from, through) s.fpLocker.Unlock(fp) @@ -641,7 +653,7 @@ FP_LOOP: continue FP_LOOP } - for _, m := range matchers[matcherIdx:] { + for _, m := range matchersToCheck { if !m.Match(met[m.Name]) { continue FP_LOOP } From e6db9f81594507ee954d452a5a65416a395718f8 Mon Sep 17 00:00:00 2001 From: Maxim Ivanov Date: Sun, 18 Sep 2016 12:20:46 +0100 Subject: [PATCH 4/4] New fpsForLabelMatchers and seriesForLabelMatchers methods These more specific methods have replaced `metricForLabelMatchers` in cases where its `map[fingerprint]metric` result type was not necessary or was used as an intermediate step Avoids duplicated calls to `seriesForRange` from `QueryRange` and `QueryInstant` methods. --- storage/local/storage.go | 126 +++++++++++++++++++++++++--------- storage/local/storage_test.go | 30 ++++---- storage/local/test_helpers.go | 4 ++ 3 files changed, 113 insertions(+), 47 deletions(-) diff --git a/storage/local/storage.go b/storage/local/storage.go index 78eb557c7..5d4188419 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -415,19 +415,19 @@ func (s *MemorySeriesStorage) WaitForIndexing() { // LastSampleForLabelMatchers implements Storage. func (s *MemorySeriesStorage) LastSampleForLabelMatchers(_ context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { - fps := map[model.Fingerprint]struct{}{} + mergedFPs := map[model.Fingerprint]struct{}{} for _, matchers := range matcherSets { - fpToMetric, err := s.metricsForLabelMatchers(cutoff, model.Latest, matchers...) + fps, err := s.fpsForLabelMatchers(cutoff, model.Latest, matchers...) if err != nil { return nil, err } - for fp := range fpToMetric { - fps[fp] = struct{}{} + for fp := range fps { + mergedFPs[fp] = struct{}{} } } - res := make(model.Vector, 0, len(fps)) - for fp := range fps { + res := make(model.Vector, 0, len(mergedFPs)) + for fp := range mergedFPs { s.fpLocker.Lock(fp) series, ok := s.fpToSeries.get(fp) @@ -485,13 +485,13 @@ func (bit *boundedIterator) Close() { // QueryRange implements Storage. func (s *MemorySeriesStorage) QueryRange(_ context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { - fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) + fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...) if err != nil { return nil, err } - iterators := make([]SeriesIterator, 0, len(fpToMetric)) - for fp := range fpToMetric { - it := s.preloadChunksForRange(fp, from, through) + iterators := make([]SeriesIterator, 0, len(fpSeriesPairs)) + for _, pair := range fpSeriesPairs { + it := s.preloadChunksForRange(pair, from, through) iterators = append(iterators, it) } return iterators, nil @@ -502,13 +502,13 @@ func (s *MemorySeriesStorage) QueryInstant(_ context.Context, ts model.Time, sta from := ts.Add(-stalenessDelta) through := ts - fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) + fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...) if err != nil { return nil, err } - iterators := make([]SeriesIterator, 0, len(fpToMetric)) - for fp := range fpToMetric { - it := s.preloadChunksForInstant(fp, from, through) + iterators := make([]SeriesIterator, 0, len(fpSeriesPairs)) + for _, pair := range fpSeriesPairs { + it := s.preloadChunksForInstant(pair, from, through) iterators = append(iterators, it) } return iterators, nil @@ -563,7 +563,7 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers( return metrics, nil } -// returns candidate FPs for given matchers and remaining matchers to be checked +// candidateFPsForLabelMatchers returns candidate FPs for given matchers and remaining matchers to be checked. func (s *MemorySeriesStorage) candidateFPsForLabelMatchers( matchers ...*metric.LabelMatcher, ) (map[model.Fingerprint]struct{}, []*metric.LabelMatcher, error) { @@ -632,6 +632,66 @@ func (s *MemorySeriesStorage) candidateFPsForLabelMatchers( return candidateFPs, matchers[matcherIdx:], nil } +func (s *MemorySeriesStorage) seriesForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) ([]fingerprintSeriesPair, error) { + candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) + if err != nil { + return nil, err + } + + result := []fingerprintSeriesPair{} +FPLoop: + for fp := range candidateFPs { + s.fpLocker.Lock(fp) + series := s.seriesForRange(fp, from, through) + s.fpLocker.Unlock(fp) + + if series == nil { + continue FPLoop + } + + for _, m := range matchersToCheck { + if !m.Match(series.metric[m.Name]) { + continue FPLoop + } + } + result = append(result, fingerprintSeriesPair{fp, series}) + } + return result, nil +} + +func (s *MemorySeriesStorage) fpsForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) (map[model.Fingerprint]struct{}, error) { + candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) + if err != nil { + return nil, err + } + +FPLoop: + for fp := range candidateFPs { + s.fpLocker.Lock(fp) + met, _, ok := s.metricForRange(fp, from, through) + s.fpLocker.Unlock(fp) + + if !ok { + delete(candidateFPs, fp) + continue FPLoop + } + + for _, m := range matchersToCheck { + if !m.Match(met[m.Name]) { + delete(candidateFPs, fp) + continue FPLoop + } + } + } + return candidateFPs, nil +} + func (s *MemorySeriesStorage) metricsForLabelMatchers( from, through model.Time, matchers ...*metric.LabelMatcher, @@ -643,19 +703,19 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers( } result := map[model.Fingerprint]metric.Metric{} -FP_LOOP: +FPLoop: for fp := range candidateFPs { s.fpLocker.Lock(fp) met, _, ok := s.metricForRange(fp, from, through) s.fpLocker.Unlock(fp) if !ok { - continue FP_LOOP + continue FPLoop } for _, m := range matchersToCheck { if !m.Match(met[m.Name]) { - continue FP_LOOP + continue FPLoop } } result[fp] = metric.Metric{Metric: met} @@ -716,14 +776,14 @@ func (s *MemorySeriesStorage) LabelValuesForLabelName(_ context.Context, labelNa // DropMetricsForLabelMatchers implements Storage. func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(_ context.Context, matchers ...*metric.LabelMatcher) (int, error) { - fpToMetric, err := s.metricsForLabelMatchers(model.Earliest, model.Latest, matchers...) + fps, err := s.fpsForLabelMatchers(model.Earliest, model.Latest, matchers...) if err != nil { return 0, err } - for fp := range fpToMetric { + for fp := range fps { s.purgeSeries(fp, nil, nil) } - return len(fpToMetric), nil + return len(fps), nil } var ( @@ -884,7 +944,7 @@ func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me return series, nil } -// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +// seriesForRange is a helper method for seriesForLabelMatchers. // // The caller must have locked the fp. func (s *MemorySeriesStorage) seriesForRange( @@ -903,16 +963,17 @@ func (s *MemorySeriesStorage) seriesForRange( } func (s *MemorySeriesStorage) preloadChunksForRange( - fp model.Fingerprint, + pair fingerprintSeriesPair, from model.Time, through model.Time, ) SeriesIterator { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - - series := s.seriesForRange(fp, from, through) + fp, series := pair.fp, pair.series if series == nil { return nopIter } + + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + iter, err := series.preloadChunksForRange(fp, from, through, s) if err != nil { s.quarantineSeries(fp, series.metric, err) @@ -922,16 +983,17 @@ func (s *MemorySeriesStorage) preloadChunksForRange( } func (s *MemorySeriesStorage) preloadChunksForInstant( - fp model.Fingerprint, + pair fingerprintSeriesPair, from model.Time, through model.Time, ) SeriesIterator { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - - series := s.seriesForRange(fp, from, through) + fp, series := pair.fp, pair.series if series == nil { return nopIter } + + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + iter, err := series.preloadChunksForInstant(fp, from, through, s) if err != nil { s.quarantineSeries(fp, series.metric, err) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index a19f753ae..c0cee6fe0 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -636,12 +636,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } - it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[0]), model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[1]), model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -669,12 +669,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } - it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[0]), model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fpList[1]), model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -752,7 +752,7 @@ func TestQuarantineMetric(t *testing.T) { } // This will access the corrupt file and lead to quarantining. - iter := s.preloadChunksForInstant(fpToBeArchived, now.Add(-2*time.Hour-1*time.Minute), now.Add(-2*time.Hour)) + iter := s.preloadChunksForInstant(makeFingerprintSeriesPair(s, fpToBeArchived), now.Add(-2*time.Hour-1*time.Minute), now.Add(-2*time.Hour)) iter.Close() time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait. s.WaitForIndexing() @@ -894,7 +894,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) // #1 Exactly on a sample. for i, expected := range samples { @@ -972,7 +972,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) b.ResetTimer() @@ -1054,7 +1054,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) // #1 Zero length interval at sample. for i, expected := range samples { @@ -1210,7 +1210,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) b.ResetTimer() @@ -1260,7 +1260,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop ~half of the chunks. s.maintainMemorySeries(fp, 10000) - it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) actual := it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1278,7 +1278,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop everything. s.maintainMemorySeries(fp, 100000) - it = s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), model.Earliest, model.Latest) actual = it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1442,7 +1442,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunkEncoding) { } // Load everything back. - it := s.preloadChunksForRange(fp, 0, 100000) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), 0, 100000) if oldLen != len(series.chunkDescs) { t.Errorf("Expected number of chunkDescs to have reached old value again, old number %d, current number %d.", oldLen, len(series.chunkDescs)) @@ -1770,7 +1770,7 @@ func verifyStorageRandom(t testing.TB, s *MemorySeriesStorage, samples model.Sam for _, i := range rand.Perm(len(samples)) { sample := samples[i] fp := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) - it := s.preloadChunksForInstant(fp, sample.Timestamp, sample.Timestamp) + it := s.preloadChunksForInstant(makeFingerprintSeriesPair(s, fp), sample.Timestamp, sample.Timestamp) found := it.ValueAtOrBeforeTime(sample.Timestamp) startTime := it.(*boundedIterator).start switch { @@ -1813,7 +1813,7 @@ func verifyStorageSequential(t testing.TB, s *MemorySeriesStorage, samples model if it != nil { it.Close() } - it = s.preloadChunksForRange(fp, sample.Timestamp, model.Latest) + it = s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), sample.Timestamp, model.Latest) r = it.RangeValues(metric.Interval{ OldestInclusive: sample.Timestamp, NewestInclusive: model.Latest, @@ -1934,7 +1934,7 @@ func TestAppendOutOfOrder(t *testing.T) { fp := s.mapper.mapFP(m.FastFingerprint(), m) - it := s.preloadChunksForRange(fp, 0, 2) + it := s.preloadChunksForRange(makeFingerprintSeriesPair(s, fp), 0, 2) defer it.Close() want := []model.SamplePair{ diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 7c2a3a8ee..7e71a1a7d 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -65,3 +65,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*MemorySeriesStorage, return storage, closer } + +func makeFingerprintSeriesPair(s *MemorySeriesStorage, fp model.Fingerprint) fingerprintSeriesPair { + return fingerprintSeriesPair{fp, s.seriesForRange(fp, model.Earliest, model.Latest)} +}