Merge pull request #305 from prometheus/feature/cache-fp-metric-lookups

Cache fingerprint->metric lookups and drop empty series from memory on flush
pull/306/head
juliusv 2013-06-19 05:21:06 -07:00
commit 26200766bd
4 changed files with 89 additions and 37 deletions

View File

@ -53,7 +53,7 @@ documentation: search_index
godoc -http=:6060 -index -index_files='search_index' godoc -http=:6060 -index -index_files='search_index'
format: format:
find . -iname '*.go' | egrep -v "generated|\.(l|y)\.go" | xargs -n1 $(GOFMT) -w -s=true find . -iname '*.go' | egrep -v "^./build/|generated|\.(l|y)\.go" | xargs -n1 $(GOFMT) -w -s=true
model: dependencies preparation model: dependencies preparation
$(MAKE) -C model $(MAKE) -C model

View File

@ -38,7 +38,7 @@ GOCC = $(GOROOT)/bin/go
TMPDIR = /tmp TMPDIR = /tmp
GOENV = TMPDIR=$(TMPDIR) GOROOT=$(GOROOT) GOPATH=$(GOPATH) GOENV = TMPDIR=$(TMPDIR) GOROOT=$(GOROOT) GOPATH=$(GOPATH)
GO = $(GOENV) $(GOCC) GO = $(GOENV) $(GOCC)
GOFMT = $(GOENV) $(GOROOT)/bin/gofmt GOFMT = $(GOROOT)/bin/gofmt
LEVELDB_VERSION := 1.9.0 LEVELDB_VERSION := 1.9.0
PROTOCOL_BUFFERS_VERSION := 2.5.0 PROTOCOL_BUFFERS_VERSION := 2.5.0

View File

@ -141,6 +141,10 @@ func (s *stream) getRangeValues(in model.Interval) model.Values {
return result return result
} }
func (s *stream) empty() bool {
return len(s.values) == 0
}
func newStream(metric model.Metric) *stream { func newStream(metric model.Metric) *stream {
return &stream{ return &stream{
metric: metric, metric: metric,
@ -175,14 +179,28 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
metric := sample.Metric fingerprint := model.NewFingerprintFromMetric(sample.Metric)
fingerprint := model.NewFingerprintFromMetric(metric) series := s.getOrCreateSeries(sample.Metric, fingerprint)
series, ok := s.fingerprintToSeries[*fingerprint] series.add(sample.Timestamp, sample.Value)
if s.wmCache != nil { if s.wmCache != nil {
s.wmCache.Set(fingerprint, &Watermarks{High: sample.Timestamp}) s.wmCache.Set(fingerprint, &Watermarks{High: sample.Timestamp})
} }
return nil
}
func (s *memorySeriesStorage) CreateEmptySeries(metric model.Metric) {
s.Lock()
defer s.Unlock()
fingerprint := model.NewFingerprintFromMetric(metric)
s.getOrCreateSeries(metric, fingerprint)
}
func (s *memorySeriesStorage) getOrCreateSeries(metric model.Metric, fingerprint *model.Fingerprint) *stream {
series, ok := s.fingerprintToSeries[*fingerprint]
if !ok { if !ok {
series = newStream(metric) series = newStream(metric)
s.fingerprintToSeries[*fingerprint] = series s.fingerprintToSeries[*fingerprint] = series
@ -201,10 +219,71 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error {
s.labelNameToFingerprints[k] = labelNameValues s.labelNameToFingerprints[k] = labelNameValues
} }
} }
return series
}
series.add(sample.Timestamp, sample.Value) func (s *memorySeriesStorage) Flush(flushOlderThan time.Time, queue chan<- model.Samples) {
emptySeries := []model.Fingerprint{}
return nil s.RLock()
for fingerprint, stream := range s.fingerprintToSeries {
finder := func(i int) bool {
return stream.values[i].Timestamp.After(flushOlderThan)
}
stream.Lock()
i := sort.Search(len(stream.values), finder)
toArchive := stream.values[:i]
toKeep := stream.values[i:]
queued := make(model.Samples, 0, len(toArchive))
for _, value := range toArchive {
queued = append(queued, model.Sample{
Metric: stream.metric,
Timestamp: value.Timestamp,
Value: value.Value,
})
}
// BUG(all): this can deadlock if the queue is full, as we only ever clear
// the queue after calling this method:
// https://github.com/prometheus/prometheus/issues/275
queue <- queued
stream.values = toKeep
if len(toKeep) == 0 {
emptySeries = append(emptySeries, fingerprint)
}
stream.Unlock()
}
s.RUnlock()
s.Lock()
for _, fingerprint := range emptySeries {
if s.fingerprintToSeries[fingerprint].empty() {
s.dropSeries(&fingerprint)
}
}
s.Unlock()
}
// Drop all references to a series, including any samples.
func (s *memorySeriesStorage) dropSeries(fingerprint *model.Fingerprint) {
series, ok := s.fingerprintToSeries[*fingerprint]
if !ok {
return
}
for k, v := range series.metric {
labelPair := model.LabelPair{
Name: k,
Value: v,
}
delete(s.labelPairToFingerprints, labelPair)
delete(s.labelNameToFingerprints, k)
}
delete(s.fingerprintToSeries, *fingerprint)
} }
// Append raw samples, bypassing indexing. Only used to add data to views, // Append raw samples, bypassing indexing. Only used to add data to views,

View File

@ -262,38 +262,10 @@ func (t *TieredStorage) Flush() {
} }
func (t *TieredStorage) flushMemory(ttl time.Duration) { func (t *TieredStorage) flushMemory(ttl time.Duration) {
t.memoryArena.RLock() flushOlderThan := time.Now().Add(-1 * ttl)
defer t.memoryArena.RUnlock()
cutOff := time.Now().Add(-1 * ttl)
log.Println("Flushing...") log.Println("Flushing...")
t.memoryArena.Flush(flushOlderThan, t.appendToDiskQueue)
for _, stream := range t.memoryArena.fingerprintToSeries {
finder := func(i int) bool {
return stream.values[i].Timestamp.After(cutOff)
}
stream.Lock()
i := sort.Search(len(stream.values), finder)
toArchive := stream.values[:i]
toKeep := stream.values[i:]
queued := make(model.Samples, 0, len(toArchive))
for _, value := range toArchive {
queued = append(queued, model.Sample{
Metric: stream.metric,
Timestamp: value.Timestamp,
Value: value.Value,
})
}
t.appendToDiskQueue <- queued
stream.values = toKeep
stream.Unlock()
}
queueLength := len(t.appendToDiskQueue) queueLength := len(t.appendToDiskQueue)
if queueLength > 0 { if queueLength > 0 {
@ -666,6 +638,7 @@ func (t *TieredStorage) GetMetricForFingerprint(f *model.Fingerprint) (model.Met
} }
if m == nil { if m == nil {
m, err = t.DiskStorage.GetMetricForFingerprint(f) m, err = t.DiskStorage.GetMetricForFingerprint(f)
t.memoryArena.CreateEmptySeries(m)
} }
return m, err return m, err
} }