diff --git a/Makefile b/Makefile index 590eb0f91..eb89acc59 100644 --- a/Makefile +++ b/Makefile @@ -53,7 +53,7 @@ documentation: search_index godoc -http=:6060 -index -index_files='search_index' format: - find . -iname '*.go' | egrep -v "generated|\.(l|y)\.go" | xargs -n1 $(GOFMT) -w -s=true + find . -iname '*.go' | egrep -v "^./build/|generated|\.(l|y)\.go" | xargs -n1 $(GOFMT) -w -s=true model: dependencies preparation $(MAKE) -C model diff --git a/Makefile.INCLUDE b/Makefile.INCLUDE index b8e253afd..db73a729b 100644 --- a/Makefile.INCLUDE +++ b/Makefile.INCLUDE @@ -38,7 +38,7 @@ GOCC = $(GOROOT)/bin/go TMPDIR = /tmp GOENV = TMPDIR=$(TMPDIR) GOROOT=$(GOROOT) GOPATH=$(GOPATH) GO = $(GOENV) $(GOCC) -GOFMT = $(GOENV) $(GOROOT)/bin/gofmt +GOFMT = $(GOROOT)/bin/gofmt LEVELDB_VERSION := 1.9.0 PROTOCOL_BUFFERS_VERSION := 2.5.0 diff --git a/storage/metric/memory.go b/storage/metric/memory.go index 1a4af813d..252d1ac60 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -141,6 +141,10 @@ func (s *stream) getRangeValues(in model.Interval) model.Values { return result } +func (s *stream) empty() bool { + return len(s.values) == 0 +} + func newStream(metric model.Metric) *stream { return &stream{ metric: metric, @@ -175,14 +179,28 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error { s.Lock() defer s.Unlock() - metric := sample.Metric - fingerprint := model.NewFingerprintFromMetric(metric) - series, ok := s.fingerprintToSeries[*fingerprint] + fingerprint := model.NewFingerprintFromMetric(sample.Metric) + series := s.getOrCreateSeries(sample.Metric, fingerprint) + series.add(sample.Timestamp, sample.Value) if s.wmCache != nil { s.wmCache.Set(fingerprint, &Watermarks{High: sample.Timestamp}) } + return nil +} + +func (s *memorySeriesStorage) CreateEmptySeries(metric model.Metric) { + s.Lock() + defer s.Unlock() + + fingerprint := model.NewFingerprintFromMetric(metric) + s.getOrCreateSeries(metric, fingerprint) +} + +func (s *memorySeriesStorage) getOrCreateSeries(metric model.Metric, fingerprint *model.Fingerprint) *stream { + series, ok := s.fingerprintToSeries[*fingerprint] + if !ok { series = newStream(metric) s.fingerprintToSeries[*fingerprint] = series @@ -201,10 +219,71 @@ func (s *memorySeriesStorage) AppendSample(sample model.Sample) error { s.labelNameToFingerprints[k] = labelNameValues } } + return series +} - series.add(sample.Timestamp, sample.Value) +func (s *memorySeriesStorage) Flush(flushOlderThan time.Time, queue chan<- model.Samples) { + emptySeries := []model.Fingerprint{} - return nil + s.RLock() + for fingerprint, stream := range s.fingerprintToSeries { + finder := func(i int) bool { + return stream.values[i].Timestamp.After(flushOlderThan) + } + + stream.Lock() + + i := sort.Search(len(stream.values), finder) + toArchive := stream.values[:i] + toKeep := stream.values[i:] + queued := make(model.Samples, 0, len(toArchive)) + + for _, value := range toArchive { + queued = append(queued, model.Sample{ + Metric: stream.metric, + Timestamp: value.Timestamp, + Value: value.Value, + }) + } + + // BUG(all): this can deadlock if the queue is full, as we only ever clear + // the queue after calling this method: + // https://github.com/prometheus/prometheus/issues/275 + queue <- queued + + stream.values = toKeep + + if len(toKeep) == 0 { + emptySeries = append(emptySeries, fingerprint) + } + stream.Unlock() + } + s.RUnlock() + + s.Lock() + for _, fingerprint := range emptySeries { + if s.fingerprintToSeries[fingerprint].empty() { + s.dropSeries(&fingerprint) + } + } + s.Unlock() +} + +// Drop all references to a series, including any samples. +func (s *memorySeriesStorage) dropSeries(fingerprint *model.Fingerprint) { + series, ok := s.fingerprintToSeries[*fingerprint] + if !ok { + return + } + for k, v := range series.metric { + labelPair := model.LabelPair{ + Name: k, + Value: v, + } + delete(s.labelPairToFingerprints, labelPair) + delete(s.labelNameToFingerprints, k) + } + delete(s.fingerprintToSeries, *fingerprint) } // Append raw samples, bypassing indexing. Only used to add data to views, diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index bee4ce5f1..b352612cb 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -262,38 +262,10 @@ func (t *TieredStorage) Flush() { } func (t *TieredStorage) flushMemory(ttl time.Duration) { - t.memoryArena.RLock() - defer t.memoryArena.RUnlock() - - cutOff := time.Now().Add(-1 * ttl) + flushOlderThan := time.Now().Add(-1 * ttl) log.Println("Flushing...") - - for _, stream := range t.memoryArena.fingerprintToSeries { - finder := func(i int) bool { - return stream.values[i].Timestamp.After(cutOff) - } - - stream.Lock() - - i := sort.Search(len(stream.values), finder) - toArchive := stream.values[:i] - toKeep := stream.values[i:] - queued := make(model.Samples, 0, len(toArchive)) - - for _, value := range toArchive { - queued = append(queued, model.Sample{ - Metric: stream.metric, - Timestamp: value.Timestamp, - Value: value.Value, - }) - } - - t.appendToDiskQueue <- queued - - stream.values = toKeep - stream.Unlock() - } + t.memoryArena.Flush(flushOlderThan, t.appendToDiskQueue) queueLength := len(t.appendToDiskQueue) if queueLength > 0 { @@ -666,6 +638,7 @@ func (t *TieredStorage) GetMetricForFingerprint(f *model.Fingerprint) (model.Met } if m == nil { m, err = t.DiskStorage.GetMetricForFingerprint(f) + t.memoryArena.CreateEmptySeries(m) } return m, err }