From 9fb65a91af5ce5f0a17931530112c023dbdfd406 Mon Sep 17 00:00:00 2001 From: Anders Daljord Morken Date: Thu, 10 Sep 2015 22:32:40 +0200 Subject: [PATCH 1/4] Close HTTP connections on HTTP errors too. Move defer resp.Body.Close() up to make sure it's called even when the HTTP request returns something other than 200 or Decoder construction fails. This avoids leaking and eventually running out of file descriptors. --- retrieval/target.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/retrieval/target.go b/retrieval/target.go index b9d9abe2e..c37ce2f66 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -437,6 +437,8 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { if err != nil { return err } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { return fmt.Errorf("server returned HTTP status %s", resp.Status) } @@ -445,7 +447,6 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { if err != nil { return err } - defer resp.Body.Close() sdec := expfmt.SampleDecoder{ Dec: dec, From 7ef9399920e7bef68f4299dc75fc923d629ab727 Mon Sep 17 00:00:00 2001 From: Jimmi Dyson Date: Fri, 11 Sep 2015 11:44:28 +0100 Subject: [PATCH 2/4] Clean up kubernetes http response bodies --- retrieval/discovery/kubernetes/discovery.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index a495895fe..6fbe0ac80 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -115,6 +115,7 @@ func (kd *Discovery) Sources() []string { log.Errorf("Unable to list Kubernetes nodes: %s", err) return []string{} } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Errorf("Unable to list Kubernetes nodes. Unexpected response: %d %s", res.StatusCode, res.Status) return []string{} @@ -143,6 +144,7 @@ func (kd *Discovery) Sources() []string { log.Errorf("Unable to list Kubernetes services: %s", err) return []string{} } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Errorf("Unable to list Kubernetes services. Unexpected response: %d %s", res.StatusCode, res.Status) return []string{} @@ -351,6 +353,7 @@ func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, r log.Errorf("Failed to watch nodes: %s", err) return } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Errorf("Failed to watch nodes: %d", res.StatusCode) return @@ -392,6 +395,7 @@ func (kd *Discovery) watchServices(events chan interface{}, done <-chan struct{} log.Errorf("Failed to watch services: %s", err) return } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Errorf("Failed to watch services: %d", res.StatusCode) return @@ -462,6 +466,7 @@ func (kd *Discovery) addService(service *Service) *config.TargetGroup { log.Errorf("Error getting service endpoints: %s", err) return nil } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Errorf("Failed to get service endpoints: %d", res.StatusCode) return nil @@ -534,6 +539,7 @@ func (kd *Discovery) watchServiceEndpoints(events chan interface{}, done <-chan log.Errorf("Failed to watch service endpoints: %s", err) return } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Errorf("Failed to watch service endpoints: %d", res.StatusCode) return From daeccdd0e94f4265a31be0c9576bb449de475b6c Mon Sep 17 00:00:00 2001 From: beorn7 Date: Fri, 11 Sep 2015 15:47:23 +0200 Subject: [PATCH 3/4] Fix DropMetricsForFingerprints It now deletes the series file also for archived series. Also, fix a naming error in a doc comment. --- storage/local/persistence.go | 2 +- storage/local/storage.go | 7 ++-- storage/local/storage_test.go | 73 +++++++++++++++++++++++++---------- 3 files changed, 58 insertions(+), 24 deletions(-) diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 2dda63a77..5eaf0b13d 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -1059,7 +1059,7 @@ func (p *persistence) indexMetric(fp model.Fingerprint, m model.Metric) { // indexes used for fingerprintsForLabelPair, labelValuesForLabelName, and // fingerprintsModifiedBefore. The index of fingerprints to archived metrics is // not affected by this removal. (In fact, never call this method for an -// archived metric. To purge an archived metric, call purgeArchivedFingerprint.) +// archived metric. To purge an archived metric, call purgeArchivedMetric.) // If the queue is full, this method blocks until the metric can be queued. This // method is goroutine-safe. func (p *persistence) unindexMetric(fp model.Fingerprint, m model.Metric) { diff --git a/storage/local/storage.go b/storage/local/storage.go index 11581af49..41c55be13 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -518,12 +518,13 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin s.fpToSeries.del(fp) s.numSeries.Dec() s.persistence.unindexMetric(fp, series.metric) - if _, err := s.persistence.deleteSeriesFile(fp); err != nil { - log.Errorf("Error deleting series file for %v: %v", fp, err) - } } else if err := s.persistence.purgeArchivedMetric(fp); err != nil { log.Errorf("Error purging metric with fingerprint %v: %v", fp, err) } + // Attempt to delete series file in any case. + if _, err := s.persistence.deleteSeriesFile(fp); err != nil { + log.Errorf("Error deleting series file for %v: %v", fp, err) + } s.fpLocker.Unlock(fp) } diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index a3f11de65..a8ea0a7c3 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -17,6 +17,7 @@ import ( "fmt" "hash/fnv" "math/rand" + "os" "reflect" "testing" "testing/quick" @@ -438,16 +439,29 @@ func TestDropMetrics(t *testing.T) { s, closer := NewTestStorage(t, 1) defer closer.Close() + chunkFileExists := func(fp model.Fingerprint) (bool, error) { + f, err := s.persistence.openChunkFileForReading(fp) + if err == nil { + f.Close() + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + m1 := model.Metric{model.MetricNameLabel: "test", "n1": "v1"} m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"} + m3 := model.Metric{model.MetricNameLabel: "test", "n1": "v3"} N := 120000 - for j, m := range []model.Metric{m1, m2} { + for j, m := range []model.Metric{m1, m2, m3} { for i := 0; i < N; i++ { smpl := &model.Sample{ Metric: m, - Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 minute intervals. + Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 millisecond intervals. Value: model.SampleValue(j), } s.Append(smpl) @@ -455,37 +469,49 @@ func TestDropMetrics(t *testing.T) { } s.WaitForIndexing() - fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) - if len(fps) != 2 { - t.Fatalf("unexpected number of fingerprints: %d", len(fps)) + // Archive m3, but first maintain it so that at least something is written to disk. + fpToBeArchived := m3.FastFingerprint() + s.maintainMemorySeries(fpToBeArchived, 0) + s.fpLocker.Lock(fpToBeArchived) + s.fpToSeries.del(fpToBeArchived) + if err := s.persistence.archiveMetric( + fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), + ); err != nil { + t.Error(err) } + s.fpLocker.Unlock(fpToBeArchived) - var fpList model.Fingerprints - for fp := range fps { - it := s.NewIterator(fp) - if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { - t.Fatalf("unexpected number of samples: %d", len(vals)) - } - fpList = append(fpList, fp) + fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) + if len(fps) != 3 { + t.Errorf("unexpected number of fingerprints: %d", len(fps)) } + fpList := model.Fingerprints{m1.FastFingerprint(), m2.FastFingerprint(), fpToBeArchived} + s.DropMetricsForFingerprints(fpList[0]) s.WaitForIndexing() fps2 := s.fingerprintsForLabelPairs(model.LabelPair{ Name: model.MetricNameLabel, Value: "test", }) - if len(fps2) != 1 { - t.Fatalf("unexpected number of fingerprints: %d", len(fps2)) + if len(fps2) != 2 { + t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } it := s.NewIterator(fpList[0]) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { - t.Fatalf("unexpected number of samples: %d", len(vals)) + t.Errorf("unexpected number of samples: %d", len(vals)) } it = s.NewIterator(fpList[1]) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { - t.Fatalf("unexpected number of samples: %d", len(vals)) + t.Errorf("unexpected number of samples: %d", len(vals)) + } + exists, err := chunkFileExists(fpList[2]) + if err != nil { + t.Fatal(err) + } + if !exists { + t.Errorf("chunk file does not exist for fp=%v", fpList[2]) } s.DropMetricsForFingerprints(fpList...) @@ -495,16 +521,23 @@ func TestDropMetrics(t *testing.T) { Name: model.MetricNameLabel, Value: "test", }) if len(fps3) != 0 { - t.Fatalf("unexpected number of fingerprints: %d", len(fps3)) + t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } it = s.NewIterator(fpList[0]) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { - t.Fatalf("unexpected number of samples: %d", len(vals)) + t.Errorf("unexpected number of samples: %d", len(vals)) } it = s.NewIterator(fpList[1]) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { - t.Fatalf("unexpected number of samples: %d", len(vals)) + t.Errorf("unexpected number of samples: %d", len(vals)) + } + exists, err = chunkFileExists(fpList[2]) + if err != nil { + t.Fatal(err) + } + if exists { + t.Errorf("chunk file still exists for fp=%v", fpList[2]) } } @@ -533,7 +566,7 @@ func TestLoop(t *testing.T) { } storage := NewMemorySeriesStorage(o) if err := storage.Start(); err != nil { - t.Fatalf("Error starting storage: %s", err) + t.Errorf("Error starting storage: %s", err) } for _, s := range samples { storage.Append(s) From d73c8a4f0b23bb4178a4520beba0ba5ed1f236f5 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Fri, 11 Sep 2015 16:43:04 +0200 Subject: [PATCH 4/4] Remove notice about 0.14.x config file format change. --- cmd/prometheus/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index e6fd678ee..9be251c9a 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -206,7 +206,6 @@ func reloadConfig(filename string, rls ...Reloadable) (success bool) { conf, err := config.LoadFile(filename) if err != nil { log.Errorf("Couldn't load configuration (-config.file=%s): %v", filename, err) - log.Errorf("Note: The configuration format has changed with version 0.14. Please see the documentation (http://prometheus.io/docs/operating/configuration/) and the provided configuration migration tool (https://github.com/prometheus/migrate).") return false } success = true