mirror of https://github.com/prometheus/prometheus
Merge pull request #258 from prometheus/feature/fingerprint-indexing
Benchmark memory arena; simplify map generation.pull/259/merge
commit
da0257bb0e
14
.travis.yml
14
.travis.yml
|
@ -2,11 +2,17 @@
|
|||
|
||||
language: go
|
||||
|
||||
# Detective work is required to ascertain why this is required BOTH in the
|
||||
# before_script directive and in the shell library.
|
||||
# Explicitly stop before_script from doing anything by giving 'em nil work.
|
||||
before_script:
|
||||
- gvm install go1.1 || true
|
||||
- gvm use go1.1 || true
|
||||
- echo "Before Script"
|
||||
|
||||
# Explicitly stop install from doing anything by giving 'em nil work.
|
||||
install:
|
||||
- echo "Install"
|
||||
|
||||
script:
|
||||
- echo "Script"
|
||||
- cd ${TRAVIS_BUILD_DIR}
|
||||
- gvm install go1.1 || true
|
||||
- gvm use go1.1 || true
|
||||
- bash -l ./tests-for-die-in-a-fire-travis.sh
|
||||
|
|
|
@ -78,5 +78,5 @@ type Series interface {
|
|||
}
|
||||
|
||||
type IteratorsForFingerprintBuilder interface {
|
||||
ForStream(stream stream) (storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator)
|
||||
ForStream(stream *stream) (storage.RecordDecoder, storage.RecordFilter, storage.RecordOperator)
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
package metric
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/model"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
|
@ -22,12 +21,6 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// Used as a separator in the format string for generating the internal label
|
||||
// value pair set fingerprints.
|
||||
reservedDelimiter = `"`
|
||||
)
|
||||
|
||||
// Models a given sample entry stored in the in-memory arena.
|
||||
type value interface {
|
||||
// Gets the given value.
|
||||
|
@ -53,13 +46,13 @@ type stream struct {
|
|||
values *skiplist.SkipList
|
||||
}
|
||||
|
||||
func (s stream) add(timestamp time.Time, value model.SampleValue) {
|
||||
func (s *stream) add(timestamp time.Time, value model.SampleValue) {
|
||||
s.values.Set(skipListTime(timestamp), singletonValue(value))
|
||||
}
|
||||
|
||||
func (s stream) forEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) {
|
||||
func (s *stream) forEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) {
|
||||
if s.values.Len() == 0 {
|
||||
return
|
||||
return false, nil
|
||||
}
|
||||
iterator := s.values.SeekToLast()
|
||||
|
||||
|
@ -77,7 +70,7 @@ func (s stream) forEach(decoder storage.RecordDecoder, filter storage.RecordFilt
|
|||
|
||||
switch filter.Filter(decodedKey, decodedValue) {
|
||||
case storage.STOP:
|
||||
return
|
||||
return false, nil
|
||||
case storage.SKIP:
|
||||
continue
|
||||
case storage.ACCEPT:
|
||||
|
@ -93,20 +86,20 @@ func (s stream) forEach(decoder storage.RecordDecoder, filter storage.RecordFilt
|
|||
break
|
||||
}
|
||||
}
|
||||
scannedEntireCorpus = true
|
||||
return
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func newStream(metric model.Metric) stream {
|
||||
return stream{
|
||||
func newStream(metric model.Metric) *stream {
|
||||
return &stream{
|
||||
values: skiplist.New(),
|
||||
metric: metric,
|
||||
}
|
||||
}
|
||||
|
||||
type memorySeriesStorage struct {
|
||||
fingerprintToSeries map[model.Fingerprint]stream
|
||||
labelPairToFingerprints map[string]model.Fingerprints
|
||||
fingerprintToSeries map[model.Fingerprint]*stream
|
||||
labelPairToFingerprints map[model.LabelPair]model.Fingerprints
|
||||
labelNameToFingerprints map[model.LabelName]model.Fingerprints
|
||||
}
|
||||
|
||||
|
@ -128,8 +121,10 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
|
|||
s.fingerprintToSeries[fingerprint] = series
|
||||
|
||||
for k, v := range metric {
|
||||
labelPair := fmt.Sprintf("%s%s%s", k, reservedDelimiter, v)
|
||||
|
||||
labelPair := model.LabelPair{
|
||||
Name: k,
|
||||
Value: v,
|
||||
}
|
||||
labelPairValues := s.labelPairToFingerprints[labelPair]
|
||||
labelPairValues = append(labelPairValues, fingerprint)
|
||||
s.labelPairToFingerprints[labelPair] = labelPairValues
|
||||
|
@ -163,8 +158,10 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fing
|
|||
sets := []utility.Set{}
|
||||
|
||||
for k, v := range l {
|
||||
signature := fmt.Sprintf("%s%s%s", k, reservedDelimiter, v)
|
||||
values := s.labelPairToFingerprints[signature]
|
||||
values := s.labelPairToFingerprints[model.LabelPair{
|
||||
Name: k,
|
||||
Value: v,
|
||||
}]
|
||||
set := utility.Set{}
|
||||
for _, fingerprint := range values {
|
||||
set.Add(fingerprint)
|
||||
|
@ -174,7 +171,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fing
|
|||
|
||||
setCount := len(sets)
|
||||
if setCount == 0 {
|
||||
return
|
||||
return fingerprints, nil
|
||||
}
|
||||
|
||||
base := sets[0]
|
||||
|
@ -186,35 +183,35 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fing
|
|||
fingerprints = append(fingerprints, fingerprint)
|
||||
}
|
||||
|
||||
return
|
||||
return fingerprints, nil
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) GetFingerprintsForLabelName(l model.LabelName) (fingerprints model.Fingerprints, err error) {
|
||||
func (s *memorySeriesStorage) GetFingerprintsForLabelName(l model.LabelName) (fingerprints model.Fingerprints, _ error) {
|
||||
values := s.labelNameToFingerprints[l]
|
||||
|
||||
fingerprints = append(fingerprints, values...)
|
||||
|
||||
return
|
||||
return fingerprints, nil
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) GetMetricForFingerprint(f model.Fingerprint) (metric model.Metric, err error) {
|
||||
func (s *memorySeriesStorage) GetMetricForFingerprint(f model.Fingerprint) (model.Metric, error) {
|
||||
series, ok := s.fingerprintToSeries[f]
|
||||
if !ok {
|
||||
return
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
metric = model.Metric{}
|
||||
metric := model.Metric{}
|
||||
for label, value := range series.metric {
|
||||
metric[label] = value
|
||||
}
|
||||
|
||||
return
|
||||
return metric, nil
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (samples model.Values) {
|
||||
series, ok := s.fingerprintToSeries[f]
|
||||
if !ok {
|
||||
return
|
||||
return samples
|
||||
}
|
||||
|
||||
iterator := series.values.Seek(skipListTime(t))
|
||||
|
@ -226,14 +223,14 @@ func (s *memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (
|
|||
iterator = series.values.SeekToLast()
|
||||
if iterator == nil {
|
||||
// The list is empty.
|
||||
return
|
||||
return samples
|
||||
}
|
||||
}
|
||||
|
||||
defer iterator.Close()
|
||||
|
||||
if iterator.Key() == nil || iterator.Value() == nil {
|
||||
return
|
||||
return samples
|
||||
}
|
||||
|
||||
foundTime := time.Time(iterator.Key().(skipListTime))
|
||||
|
@ -249,19 +246,17 @@ func (s *memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (
|
|||
})
|
||||
}
|
||||
|
||||
return
|
||||
return samples
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first model.Values, second model.Values) {
|
||||
first = s.GetValueAtTime(f, i.OldestInclusive)
|
||||
second = s.GetValueAtTime(f, i.NewestInclusive)
|
||||
return
|
||||
func (s *memorySeriesStorage) GetBoundaryValues(f model.Fingerprint, i model.Interval) (model.Values, model.Values) {
|
||||
return s.GetValueAtTime(f, i.OldestInclusive), s.GetValueAtTime(f, i.NewestInclusive)
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) GetRangeValues(f model.Fingerprint, i model.Interval) (samples model.Values) {
|
||||
series, ok := s.fingerprintToSeries[f]
|
||||
if !ok {
|
||||
return
|
||||
return samples
|
||||
}
|
||||
|
||||
iterator := series.values.Seek(skipListTime(i.OldestInclusive))
|
||||
|
@ -273,7 +268,7 @@ func (s *memorySeriesStorage) GetRangeValues(f model.Fingerprint, i model.Interv
|
|||
iterator = series.values.SeekToLast()
|
||||
if iterator == nil {
|
||||
// The list is empty.
|
||||
return
|
||||
return samples
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -297,12 +292,12 @@ func (s *memorySeriesStorage) GetRangeValues(f model.Fingerprint, i model.Interv
|
|||
}
|
||||
}
|
||||
|
||||
return
|
||||
return samples
|
||||
}
|
||||
|
||||
func (s *memorySeriesStorage) Close() {
|
||||
s.fingerprintToSeries = map[model.Fingerprint]stream{}
|
||||
s.labelPairToFingerprints = map[string]model.Fingerprints{}
|
||||
s.fingerprintToSeries = map[model.Fingerprint]*stream{}
|
||||
s.labelPairToFingerprints = map[model.LabelPair]model.Fingerprints{}
|
||||
s.labelNameToFingerprints = map[model.LabelName]model.Fingerprints{}
|
||||
}
|
||||
|
||||
|
@ -331,8 +326,8 @@ func (s *memorySeriesStorage) ForEachSample(builder IteratorsForFingerprintBuild
|
|||
|
||||
func NewMemorySeriesStorage() *memorySeriesStorage {
|
||||
return &memorySeriesStorage{
|
||||
fingerprintToSeries: make(map[model.Fingerprint]stream),
|
||||
labelPairToFingerprints: make(map[string]model.Fingerprints),
|
||||
fingerprintToSeries: make(map[model.Fingerprint]*stream),
|
||||
labelPairToFingerprints: make(map[model.LabelPair]model.Fingerprints),
|
||||
labelNameToFingerprints: make(map[model.LabelName]model.Fingerprints),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/model"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func BenchmarkStreamAdd(b *testing.B) {
|
||||
b.StopTimer()
|
||||
s := newStream(model.Metric{})
|
||||
times := make([]time.Time, 0, b.N)
|
||||
samples := make([]model.SampleValue, 0, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
times = append(times, time.Date(i, 0, 0, 0, 0, 0, 0, time.UTC))
|
||||
samples = append(samples, model.SampleValue(i))
|
||||
}
|
||||
|
||||
b.StartTimer()
|
||||
|
||||
var pre runtime.MemStats
|
||||
runtime.ReadMemStats(&pre)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.add(times[i], samples[i])
|
||||
}
|
||||
|
||||
var post runtime.MemStats
|
||||
runtime.ReadMemStats(&post)
|
||||
|
||||
b.Logf("%d cycles with %f bytes per cycle, totalling %d", b.N, float32(post.TotalAlloc-pre.TotalAlloc)/float32(b.N), post.TotalAlloc-pre.TotalAlloc)
|
||||
}
|
||||
|
||||
func benchmarkAppendSample(b *testing.B, labels int) {
|
||||
b.StopTimer()
|
||||
s := NewMemorySeriesStorage()
|
||||
|
||||
metric := model.Metric{}
|
||||
|
||||
for i := 0; i < labels; i++ {
|
||||
metric[model.LabelName(fmt.Sprintf("label_%d", i))] = model.LabelValue(fmt.Sprintf("value_%d", i))
|
||||
}
|
||||
samples := make(model.Samples, 0, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
samples = append(samples, model.Sample{
|
||||
Metric: metric,
|
||||
Value: model.SampleValue(i),
|
||||
Timestamp: time.Date(i, 0, 0, 0, 0, 0, 0, time.UTC),
|
||||
})
|
||||
}
|
||||
|
||||
b.StartTimer()
|
||||
var pre runtime.MemStats
|
||||
runtime.ReadMemStats(&pre)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.AppendSample(samples[i])
|
||||
}
|
||||
|
||||
var post runtime.MemStats
|
||||
runtime.ReadMemStats(&post)
|
||||
|
||||
b.Logf("%d cycles with %f bytes per cycle, totalling %d", b.N, float32(post.TotalAlloc-pre.TotalAlloc)/float32(b.N), post.TotalAlloc-pre.TotalAlloc)
|
||||
}
|
||||
|
||||
func BenchmarkAppendSample1(b *testing.B) {
|
||||
benchmarkAppendSample(b, 1)
|
||||
}
|
||||
|
||||
func BenchmarkAppendSample10(b *testing.B) {
|
||||
benchmarkAppendSample(b, 10)
|
||||
}
|
||||
|
||||
func BenchmarkAppendSample100(b *testing.B) {
|
||||
benchmarkAppendSample(b, 100)
|
||||
}
|
||||
|
||||
func BenchmarkAppendSample1000(b *testing.B) {
|
||||
benchmarkAppendSample(b, 1000)
|
||||
}
|
|
@ -239,7 +239,7 @@ type memoryToDiskFlusher struct {
|
|||
}
|
||||
|
||||
type memoryToDiskFlusherVisitor struct {
|
||||
stream stream
|
||||
stream *stream
|
||||
flusher *memoryToDiskFlusher
|
||||
memoryDeleteMutex *sync.RWMutex
|
||||
}
|
||||
|
@ -293,7 +293,7 @@ func (f memoryToDiskFlusherVisitor) Operate(key, value interface{}) (err *storag
|
|||
return
|
||||
}
|
||||
|
||||
func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) {
|
||||
func (f *memoryToDiskFlusher) ForStream(stream *stream) (decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) {
|
||||
visitor := memoryToDiskFlusherVisitor{
|
||||
stream: stream,
|
||||
flusher: f,
|
||||
|
|
Loading…
Reference in New Issue