From c07abf85219141c7b98a8f870740e2994b397236 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Tue, 21 May 2013 18:12:02 +0200 Subject: [PATCH] Initial move away from skiplist. --- storage/metric/interface.go | 3 - storage/metric/leveldb.go | 4 - storage/metric/memory.go | 251 ++++++++++++------------ storage/metric/rule_integration_test.go | 4 +- storage/metric/stochastic_test.go | 32 ++- storage/metric/tiered.go | 187 ++++-------------- 6 files changed, 194 insertions(+), 287 deletions(-) diff --git a/storage/metric/interface.go b/storage/metric/interface.go index a3378c8f4..336a31eb4 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -50,9 +50,6 @@ type MetricPersistence interface { GetValueAtTime(*model.Fingerprint, time.Time) model.Values GetBoundaryValues(*model.Fingerprint, model.Interval) (first model.Values, second model.Values) GetRangeValues(*model.Fingerprint, model.Interval) model.Values - - ForEachSample(IteratorsForFingerprintBuilder) (err error) - // Get all label values that are associated with a given label name. GetAllValuesForLabel(model.LabelName) (model.LabelValues, error) diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index c72c74ed0..7e281169a 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -863,10 +863,6 @@ func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName model.LabelNam return } -func (l *LevelDBMetricPersistence) ForEachSample(builder IteratorsForFingerprintBuilder) (err error) { - panic("not implemented") -} - // CompactKeyspace compacts each database's keyspace serially. // // Beware that it would probably be imprudent to run this on a live user-facing diff --git a/storage/metric/memory.go b/storage/metric/memory.go index a50a8fa2e..5d8857a15 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -15,12 +15,18 @@ package metric import ( "github.com/prometheus/prometheus/model" - "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/utility" - "github.com/ryszard/goskiplist/skiplist" + "sort" + "sync" "time" ) +const ( + // Assuming sample rate of 1 / 15Hz, this allows for one hour's worth of + // storage per metric without any major reallocations. + initialSeriesArena = 4 * 60 +) + // Models a given sample entry stored in the in-memory arena. type value interface { // Gets the given value. @@ -35,69 +41,94 @@ func (v singletonValue) get() model.SampleValue { return model.SampleValue(v) } -type skipListTime time.Time - -func (t skipListTime) LessThan(o skiplist.Ordered) bool { - return time.Time(o.(skipListTime)).Before(time.Time(t)) -} - type stream struct { + sync.RWMutex + metric model.Metric - values *skiplist.SkipList + values model.Values } func (s *stream) add(timestamp time.Time, value model.SampleValue) { - s.values.Set(skipListTime(timestamp), singletonValue(value)) + s.Lock() + defer s.Unlock() + + s.values = append(s.values, model.SamplePair{ + Timestamp: timestamp, + Value: value, + }) } -func (s *stream) forEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) { - if s.values.Len() == 0 { - return false, nil +func (s *stream) clone() model.Values { + s.RLock() + defer s.RUnlock() + + clone := make(model.Values, len(s.values)) + copy(clone, s.values) + + return clone +} + +func (s *stream) getValueAtTime(t time.Time) model.Values { + s.RLock() + defer s.RUnlock() + + l := len(s.values) + switch l { + case 0: + return model.Values{} + case 1: + return model.Values{s.values[0]} + default: + index := sort.Search(l, func(i int) bool { + return !s.values[i].Timestamp.Before(t) + }) + + if index == 0 { + return model.Values{s.values[0]} + } + if index == l { + return model.Values{s.values[l-1]} + } + + if s.values[index].Timestamp.Equal(t) { + return model.Values{s.values[index]} + } + return model.Values{s.values[index-1], s.values[index]} } - iterator := s.values.SeekToLast() +} - defer iterator.Close() +func (s *stream) getBoundaryValues(i model.Interval) (model.Values, model.Values) { + return s.getValueAtTime(i.OldestInclusive), s.getValueAtTime(i.NewestInclusive) +} - for !(iterator.Key() == nil || iterator.Value() == nil) { - decodedKey, decodeErr := decoder.DecodeKey(iterator.Key()) - if decodeErr != nil { - panic(decodeErr) - } - decodedValue, decodeErr := decoder.DecodeValue(iterator.Value()) - if decodeErr != nil { - panic(decodeErr) - } +func (s *stream) getRangeValues(in model.Interval) model.Values { + s.RLock() + defer s.RUnlock() - switch filter.Filter(decodedKey, decodedValue) { - case storage.STOP: - return false, nil - case storage.SKIP: - continue - case storage.ACCEPT: - opErr := operator.Operate(decodedKey, decodedValue) - if opErr != nil { - if opErr.Continuable { - continue - } - break - } - } - if !iterator.Previous() { - break - } - } + oldest := sort.Search(len(s.values), func(i int) bool { + return !s.values[i].Timestamp.Before(in.OldestInclusive) + }) - return true, nil + newest := sort.Search(len(s.values), func(i int) bool { + return s.values[i].Timestamp.After(in.NewestInclusive) + }) + + result := make(model.Values, newest-oldest) + copy(result, s.values[oldest:newest]) + + return result } func newStream(metric model.Metric) *stream { return &stream{ - values: skiplist.New(), metric: metric, + values: make(model.Values, 0, initialSeriesArena), } } type memorySeriesStorage struct { + sync.RWMutex + fingerprintToSeries map[model.Fingerprint]*stream labelPairToFingerprints map[model.LabelPair]model.Fingerprints labelNameToFingerprints map[model.LabelName]model.Fingerprints @@ -114,10 +145,13 @@ func (s *memorySeriesStorage) AppendSamples(samples model.Samples) error { func (s *memorySeriesStorage) AppendSample(sample model.Sample) error { metric := sample.Metric fingerprint := model.NewFingerprintFromMetric(metric) + s.RLock() series, ok := s.fingerprintToSeries[*fingerprint] + s.RUnlock() if !ok { series = newStream(metric) + s.Lock() s.fingerprintToSeries[*fingerprint] = series for k, v := range metric { @@ -143,20 +177,24 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error { // Append raw sample, bypassing indexing. Only used to add data to views, which // don't need to lookup by metric. func (s *memorySeriesStorage) appendSampleWithoutIndexing(f *model.Fingerprint, timestamp time.Time, value model.SampleValue) { + s.RLock() series, ok := s.fingerprintToSeries[*f] + s.RUnlock() if !ok { series = newStream(model.Metric{}) + s.Lock() s.fingerprintToSeries[*f] = series + s.Unlock() } series.add(timestamp, value) } func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fingerprints model.Fingerprints, err error) { - sets := []utility.Set{} + s.RLock() for k, v := range l { values := s.labelPairToFingerprints[model.LabelPair{ Name: k, @@ -168,6 +206,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fing } sets = append(sets, set) } + s.RUnlock() setCount := len(sets) if setCount == 0 { @@ -186,16 +225,24 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fing return fingerprints, nil } -func (s *memorySeriesStorage) GetFingerprintsForLabelName(l model.LabelName) (fingerprints model.Fingerprints, _ error) { - values := s.labelNameToFingerprints[l] +func (s *memorySeriesStorage) GetFingerprintsForLabelName(l model.LabelName) (model.Fingerprints, error) { + s.RLock() + values, ok := s.labelNameToFingerprints[l] + s.RUnlock() + if !ok { + return nil, nil + } - fingerprints = append(fingerprints, values...) + fingerprints := make(model.Fingerprints, len(values)) + copy(fingerprints, values) return fingerprints, nil } func (s *memorySeriesStorage) GetMetricForFingerprint(f *model.Fingerprint) (model.Metric, error) { + s.RLock() series, ok := s.fingerprintToSeries[*f] + s.RUnlock() if !ok { return nil, nil } @@ -208,91 +255,49 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(f *model.Fingerprint) (mod return metric, nil } -func (s *memorySeriesStorage) GetValueAtTime(f *model.Fingerprint, t time.Time) (samples model.Values) { +func (s *memorySeriesStorage) CloneSamples(f *model.Fingerprint) model.Values { + s.RLock() series, ok := s.fingerprintToSeries[*f] + s.RUnlock() if !ok { - return samples + return nil } - iterator := series.values.Seek(skipListTime(t)) - if iterator == nil { - // If the iterator is nil, it means we seeked past the end of the series, - // so we seek to the last value instead. Due to the reverse ordering - // defined on skipListTime, this corresponds to the sample with the - // earliest timestamp. - iterator = series.values.SeekToLast() - if iterator == nil { - // The list is empty. - return samples - } + return series.clone() +} + +func (s *memorySeriesStorage) GetValueAtTime(f *model.Fingerprint, t time.Time) model.Values { + s.RLock() + series, ok := s.fingerprintToSeries[*f] + s.RUnlock() + if !ok { + return nil } - defer iterator.Close() - - if iterator.Key() == nil || iterator.Value() == nil { - return samples - } - - foundTime := time.Time(iterator.Key().(skipListTime)) - samples = append(samples, model.SamplePair{ - Timestamp: foundTime, - Value: iterator.Value().(value).get(), - }) - - if foundTime.Before(t) && iterator.Previous() { - samples = append(samples, model.SamplePair{ - Timestamp: time.Time(iterator.Key().(skipListTime)), - Value: iterator.Value().(value).get(), - }) - } - - return samples + return series.getValueAtTime(t) } func (s *memorySeriesStorage) GetBoundaryValues(f *model.Fingerprint, i model.Interval) (model.Values, model.Values) { - return s.GetValueAtTime(f, i.OldestInclusive), s.GetValueAtTime(f, i.NewestInclusive) + s.RLock() + series, ok := s.fingerprintToSeries[*f] + s.RUnlock() + if !ok { + return nil, nil + } + + return series.getBoundaryValues(i) } -func (s *memorySeriesStorage) GetRangeValues(f *model.Fingerprint, i model.Interval) (samples model.Values) { +func (s *memorySeriesStorage) GetRangeValues(f *model.Fingerprint, i model.Interval) model.Values { + s.RLock() series, ok := s.fingerprintToSeries[*f] + s.RUnlock() + if !ok { - return samples + return nil } - iterator := series.values.Seek(skipListTime(i.OldestInclusive)) - if iterator == nil { - // If the iterator is nil, it means we seeked past the end of the series, - // so we seek to the last value instead. Due to the reverse ordering - // defined on skipListTime, this corresponds to the sample with the - // earliest timestamp. - iterator = series.values.SeekToLast() - if iterator == nil { - // The list is empty. - return samples - } - } - - defer iterator.Close() - - for { - timestamp := time.Time(iterator.Key().(skipListTime)) - if timestamp.After(i.NewestInclusive) { - break - } - - if !timestamp.Before(i.OldestInclusive) { - samples = append(samples, model.SamplePair{ - Value: iterator.Value().(value).get(), - Timestamp: timestamp, - }) - } - - if !iterator.Previous() { - break - } - } - - return samples + return series.getRangeValues(i) } func (s *memorySeriesStorage) Close() { @@ -314,16 +319,6 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (v return } -func (s *memorySeriesStorage) ForEachSample(builder IteratorsForFingerprintBuilder) (err error) { - for _, stream := range s.fingerprintToSeries { - decoder, filter, operator := builder.ForStream(stream) - - stream.forEach(decoder, filter, operator) - } - - return -} - func NewMemorySeriesStorage() *memorySeriesStorage { return &memorySeriesStorage{ fingerprintToSeries: make(map[model.Fingerprint]*stream), diff --git a/storage/metric/rule_integration_test.go b/storage/metric/rule_integration_test.go index 6004a0521..ab01995cd 100644 --- a/storage/metric/rule_integration_test.go +++ b/storage/metric/rule_integration_test.go @@ -339,11 +339,11 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer actual := p.GetValueAtTime(model.NewFingerprintFromMetric(m), time) if len(behavior.output) != len(actual) { - t.Fatalf("%d.%d(%s). Expected %d samples but got: %v\n", i, j, behavior.name, len(behavior.output), actual) + t.Fatalf("%d.%d(%s.%s). Expected %d samples but got: %v\n", i, j, context.name, behavior.name, len(behavior.output), actual) } for k, samplePair := range actual { if samplePair.Value != behavior.output[k] { - t.Fatalf("%d.%d.%d(%s). Expected %s but got %s\n", i, j, k, behavior.name, behavior.output[k], samplePair) + t.Fatalf("%d.%d.%d(%s.%s). Expected %s but got %s\n", i, j, k, context.name, behavior.name, behavior.output[k], samplePair) } } diff --git a/storage/metric/stochastic_test.go b/storage/metric/stochastic_test.go index 2f34703da..666be10cc 100644 --- a/storage/metric/stochastic_test.go +++ b/storage/metric/stochastic_test.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/prometheus/utility/test" "math" "math/rand" + "sort" "testing" "testing/quick" "time" @@ -223,6 +224,20 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp *model.Fingerprint, i return } +type timeslice []time.Time + +func (t timeslice) Len() int { + return len(t) +} + +func (t timeslice) Swap(i, j int) { + t[i], t[j] = t[j], t[i] +} + +func (t timeslice) Less(i, j int) bool { + return t[i].Before(t[j]) +} + func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t test.Tester) { stochastic := func(x int) (success bool) { p, closer := persistenceMaker() @@ -266,11 +281,9 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t timestamps := map[int64]bool{} metricTimestamps[metricIndex] = timestamps - var ( - newestSample int64 = math.MinInt64 - oldestSample int64 = math.MaxInt64 - nextTimestamp func() int64 - ) + var newestSample int64 = math.MinInt64 + var oldestSample int64 = math.MaxInt64 + var nextTimestamp func() int64 nextTimestamp = func() int64 { var candidate int64 @@ -294,8 +307,15 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t return candidate } + // BUG(matt): Invariant of the in-memory database assumes this. + sortedTimestamps := timeslice{} for sampleIndex := 0; sampleIndex < numberOfSamples; sampleIndex++ { - sample.Timestamp = time.Unix(nextTimestamp(), 0) + sortedTimestamps = append(sortedTimestamps, time.Unix(nextTimestamp(), 0)) + } + sort.Sort(sortedTimestamps) + + for sampleIndex := 0; sampleIndex < numberOfSamples; sampleIndex++ { + sample.Timestamp = sortedTimestamps[sampleIndex] sample.Value = model.SampleValue(sampleIndex) err := p.AppendSample(sample) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index ccf16022c..3e5073ae0 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -19,7 +19,6 @@ import ( "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" dto "github.com/prometheus/prometheus/model/generated" - "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw/leveldb" "log" "sort" @@ -57,6 +56,7 @@ type TieredStorage struct { // BUG(matt): This introduces a Law of Demeter violation. Ugh. DiskStorage *LevelDBMetricPersistence + // BUG(matt): Replace this with a map? appendToDiskQueue chan model.Samples diskFrontier *diskFrontier @@ -65,13 +65,6 @@ type TieredStorage struct { memoryTTL time.Duration flushMemoryInterval time.Duration - // This mutex manages any concurrent reads/writes of the memoryArena. - memoryMutex sync.RWMutex - // This mutex blocks only deletions from the memoryArena. It is held for a - // potentially long time for an entire renderView() duration, since we depend - // on no samples being removed from memory after grabbing a LevelDB snapshot. - memoryDeleteMutex sync.RWMutex - viewQueue chan viewJob draining chan chan bool @@ -111,9 +104,7 @@ func (t *TieredStorage) AppendSamples(samples model.Samples) (err error) { return fmt.Errorf("Storage is in the process of draining.") } - t.memoryMutex.Lock() t.memoryArena.AppendSamples(samples) - t.memoryMutex.Unlock() return } @@ -130,10 +121,9 @@ func (t *TieredStorage) Drain() { } // Enqueues a ViewRequestBuilder for materialization, subject to a timeout. -func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) { +func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (View, error) { if len(t.draining) > 0 { - err = fmt.Errorf("Storage is in the process of draining.") - return + return nil, fmt.Errorf("Storage is in the process of draining.") } // The result channel needs a one-element buffer in case we have timed out in @@ -152,16 +142,14 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat } select { - case value := <-result: - view = value - case err = <-errChan: - return + case view := <-result: + return view, nil + case err := <-errChan: + return nil, err case <-time.After(deadline): abortChan <- true - err = fmt.Errorf("MakeView timed out after %s.", deadline) + return nil, fmt.Errorf("MakeView timed out after %s.", deadline) } - - return } func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) { @@ -218,6 +206,39 @@ func (t *TieredStorage) Flush() { t.flushMemory() } +func (t *TieredStorage) flushMemory() { + t.memoryArena.RLock() + defer t.memoryArena.RUnlock() + + cutOff := time.Now().Add(-1 * t.memoryTTL) + + for _, stream := range t.memoryArena.fingerprintToSeries { + finder := func(i int) bool { + return !cutOff.After(stream.values[i].Timestamp) + } + + stream.Lock() + + i := sort.Search(len(stream.values), finder) + toArchive := stream.values[i:] + toKeep := stream.values[:i] + queued := make(model.Samples, 0, len(toArchive)) + + for _, value := range toArchive { + queued = append(queued, model.Sample{ + Metric: stream.metric, + Timestamp: value.Timestamp, + Value: value.Value, + }) + } + + t.appendToDiskQueue <- queued + + stream.values = toKeep + stream.Unlock() + } +} + func (t *TieredStorage) Close() { log.Println("Closing tiered storage...") t.Drain() @@ -229,118 +250,6 @@ func (t *TieredStorage) Close() { log.Println("Done.") } -type memoryToDiskFlusher struct { - toDiskQueue chan model.Samples - disk MetricPersistence - olderThan time.Time - valuesAccepted int - valuesRejected int - memoryDeleteMutex *sync.RWMutex -} - -type memoryToDiskFlusherVisitor struct { - stream *stream - flusher *memoryToDiskFlusher - memoryDeleteMutex *sync.RWMutex -} - -func (f memoryToDiskFlusherVisitor) DecodeKey(in interface{}) (out interface{}, err error) { - out = time.Time(in.(skipListTime)) - return -} - -func (f memoryToDiskFlusherVisitor) DecodeValue(in interface{}) (out interface{}, err error) { - out = in.(value).get() - return -} - -func (f memoryToDiskFlusherVisitor) Filter(key, value interface{}) (filterResult storage.FilterResult) { - var ( - recordTime = key.(time.Time) - ) - - if recordTime.Before(f.flusher.olderThan) { - f.flusher.valuesAccepted++ - - return storage.ACCEPT - } - f.flusher.valuesRejected++ - return storage.STOP -} - -func (f memoryToDiskFlusherVisitor) Operate(key, value interface{}) (err *storage.OperatorError) { - var ( - recordTime = key.(time.Time) - recordValue = value.(model.SampleValue) - ) - - if len(f.flusher.toDiskQueue) == cap(f.flusher.toDiskQueue) { - f.flusher.Flush() - } - - f.flusher.toDiskQueue <- model.Samples{ - model.Sample{ - Metric: f.stream.metric, - Timestamp: recordTime, - Value: recordValue, - }, - } - - f.memoryDeleteMutex.Lock() - f.stream.values.Delete(skipListTime(recordTime)) - f.memoryDeleteMutex.Unlock() - - return -} - -func (f *memoryToDiskFlusher) ForStream(stream *stream) (decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) { - visitor := memoryToDiskFlusherVisitor{ - stream: stream, - flusher: f, - memoryDeleteMutex: f.memoryDeleteMutex, - } - - return visitor, visitor, visitor -} - -func (f *memoryToDiskFlusher) Flush() { - length := len(f.toDiskQueue) - samples := model.Samples{} - for i := 0; i < length; i++ { - samples = append(samples, <-f.toDiskQueue...) - } - f.disk.AppendSamples(samples) -} - -func (f memoryToDiskFlusher) Close() { - f.Flush() -} - -// Persist a whole bunch of samples from memory to the datastore. -func (t *TieredStorage) flushMemory() { - begin := time.Now() - defer func() { - duration := time.Since(begin) - - recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: flushMemory, result: failure}) - }() - - t.memoryMutex.RLock() - defer t.memoryMutex.RUnlock() - - flusher := &memoryToDiskFlusher{ - disk: t.DiskStorage, - olderThan: time.Now().Add(-1 * t.memoryTTL), - toDiskQueue: t.appendToDiskQueue, - memoryDeleteMutex: &t.memoryDeleteMutex, - } - defer flusher.Close() - - t.memoryArena.ForEachSample(flusher) - - return -} - func (t *TieredStorage) renderView(viewJob viewJob) { // Telemetry. var err error @@ -351,10 +260,6 @@ func (t *TieredStorage) renderView(viewJob viewJob) { recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure}) }() - // No samples may be deleted from memory while rendering a view. - t.memoryDeleteMutex.RLock() - defer t.memoryDeleteMutex.RUnlock() - scans := viewJob.builder.ScanJobs() view := newView() // Get a single iterator that will be used for all data extraction below. @@ -378,6 +283,8 @@ func (t *TieredStorage) renderView(viewJob viewJob) { } standingOps := scanJob.operations + memValues := t.memoryArena.CloneSamples(scanJob.fingerprint) + for len(standingOps) > 0 { // Abort the view rendering if the caller (MakeView) has timed out. if len(viewJob.abort) > 0 { @@ -388,16 +295,8 @@ func (t *TieredStorage) renderView(viewJob viewJob) { targetTime := *standingOps[0].CurrentTime() currentChunk := chunk{} - t.memoryMutex.RLock() - memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime) - t.memoryMutex.RUnlock() // If we aimed before the oldest value in memory, load more data from disk. if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil { - // XXX: For earnest performance gains analagous to the benchmarking we - // performed, chunk should only be reloaded if it no longer contains - // the values we're looking for. - // - // To better understand this, look at https://github.com/prometheus/prometheus/blob/benchmark/leveldb/iterator-seek-characteristics/leveldb.go#L239 and note the behavior around retrievedValue. diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime) // If we aimed past the newest value on disk, combine it with the next value from memory.