diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index e6fd678ee..9be251c9a 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -206,7 +206,6 @@ func reloadConfig(filename string, rls ...Reloadable) (success bool) { conf, err := config.LoadFile(filename) if err != nil { log.Errorf("Couldn't load configuration (-config.file=%s): %v", filename, err) - log.Errorf("Note: The configuration format has changed with version 0.14. Please see the documentation (http://prometheus.io/docs/operating/configuration/) and the provided configuration migration tool (https://github.com/prometheus/migrate).") return false } success = true diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index a495895fe..6fbe0ac80 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -115,6 +115,7 @@ func (kd *Discovery) Sources() []string { log.Errorf("Unable to list Kubernetes nodes: %s", err) return []string{} } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Errorf("Unable to list Kubernetes nodes. Unexpected response: %d %s", res.StatusCode, res.Status) return []string{} @@ -143,6 +144,7 @@ func (kd *Discovery) Sources() []string { log.Errorf("Unable to list Kubernetes services: %s", err) return []string{} } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Errorf("Unable to list Kubernetes services. Unexpected response: %d %s", res.StatusCode, res.Status) return []string{} @@ -351,6 +353,7 @@ func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, r log.Errorf("Failed to watch nodes: %s", err) return } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Errorf("Failed to watch nodes: %d", res.StatusCode) return @@ -392,6 +395,7 @@ func (kd *Discovery) watchServices(events chan interface{}, done <-chan struct{} log.Errorf("Failed to watch services: %s", err) return } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Errorf("Failed to watch services: %d", res.StatusCode) return @@ -462,6 +466,7 @@ func (kd *Discovery) addService(service *Service) *config.TargetGroup { log.Errorf("Error getting service endpoints: %s", err) return nil } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Errorf("Failed to get service endpoints: %d", res.StatusCode) return nil @@ -534,6 +539,7 @@ func (kd *Discovery) watchServiceEndpoints(events chan interface{}, done <-chan log.Errorf("Failed to watch service endpoints: %s", err) return } + defer res.Body.Close() if res.StatusCode != http.StatusOK { log.Errorf("Failed to watch service endpoints: %d", res.StatusCode) return diff --git a/retrieval/target.go b/retrieval/target.go index b9d9abe2e..c37ce2f66 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -437,6 +437,8 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { if err != nil { return err } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { return fmt.Errorf("server returned HTTP status %s", resp.Status) } @@ -445,7 +447,6 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { if err != nil { return err } - defer resp.Body.Close() sdec := expfmt.SampleDecoder{ Dec: dec, diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 2dda63a77..5eaf0b13d 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -1059,7 +1059,7 @@ func (p *persistence) indexMetric(fp model.Fingerprint, m model.Metric) { // indexes used for fingerprintsForLabelPair, labelValuesForLabelName, and // fingerprintsModifiedBefore. The index of fingerprints to archived metrics is // not affected by this removal. (In fact, never call this method for an -// archived metric. To purge an archived metric, call purgeArchivedFingerprint.) +// archived metric. To purge an archived metric, call purgeArchivedMetric.) // If the queue is full, this method blocks until the metric can be queued. This // method is goroutine-safe. func (p *persistence) unindexMetric(fp model.Fingerprint, m model.Metric) { diff --git a/storage/local/storage.go b/storage/local/storage.go index 11581af49..41c55be13 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -518,12 +518,13 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin s.fpToSeries.del(fp) s.numSeries.Dec() s.persistence.unindexMetric(fp, series.metric) - if _, err := s.persistence.deleteSeriesFile(fp); err != nil { - log.Errorf("Error deleting series file for %v: %v", fp, err) - } } else if err := s.persistence.purgeArchivedMetric(fp); err != nil { log.Errorf("Error purging metric with fingerprint %v: %v", fp, err) } + // Attempt to delete series file in any case. + if _, err := s.persistence.deleteSeriesFile(fp); err != nil { + log.Errorf("Error deleting series file for %v: %v", fp, err) + } s.fpLocker.Unlock(fp) } diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index a3f11de65..a8ea0a7c3 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -17,6 +17,7 @@ import ( "fmt" "hash/fnv" "math/rand" + "os" "reflect" "testing" "testing/quick" @@ -438,16 +439,29 @@ func TestDropMetrics(t *testing.T) { s, closer := NewTestStorage(t, 1) defer closer.Close() + chunkFileExists := func(fp model.Fingerprint) (bool, error) { + f, err := s.persistence.openChunkFileForReading(fp) + if err == nil { + f.Close() + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + m1 := model.Metric{model.MetricNameLabel: "test", "n1": "v1"} m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"} + m3 := model.Metric{model.MetricNameLabel: "test", "n1": "v3"} N := 120000 - for j, m := range []model.Metric{m1, m2} { + for j, m := range []model.Metric{m1, m2, m3} { for i := 0; i < N; i++ { smpl := &model.Sample{ Metric: m, - Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 minute intervals. + Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 millisecond intervals. Value: model.SampleValue(j), } s.Append(smpl) @@ -455,37 +469,49 @@ func TestDropMetrics(t *testing.T) { } s.WaitForIndexing() - fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) - if len(fps) != 2 { - t.Fatalf("unexpected number of fingerprints: %d", len(fps)) + // Archive m3, but first maintain it so that at least something is written to disk. + fpToBeArchived := m3.FastFingerprint() + s.maintainMemorySeries(fpToBeArchived, 0) + s.fpLocker.Lock(fpToBeArchived) + s.fpToSeries.del(fpToBeArchived) + if err := s.persistence.archiveMetric( + fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), + ); err != nil { + t.Error(err) } + s.fpLocker.Unlock(fpToBeArchived) - var fpList model.Fingerprints - for fp := range fps { - it := s.NewIterator(fp) - if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { - t.Fatalf("unexpected number of samples: %d", len(vals)) - } - fpList = append(fpList, fp) + fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) + if len(fps) != 3 { + t.Errorf("unexpected number of fingerprints: %d", len(fps)) } + fpList := model.Fingerprints{m1.FastFingerprint(), m2.FastFingerprint(), fpToBeArchived} + s.DropMetricsForFingerprints(fpList[0]) s.WaitForIndexing() fps2 := s.fingerprintsForLabelPairs(model.LabelPair{ Name: model.MetricNameLabel, Value: "test", }) - if len(fps2) != 1 { - t.Fatalf("unexpected number of fingerprints: %d", len(fps2)) + if len(fps2) != 2 { + t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } it := s.NewIterator(fpList[0]) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { - t.Fatalf("unexpected number of samples: %d", len(vals)) + t.Errorf("unexpected number of samples: %d", len(vals)) } it = s.NewIterator(fpList[1]) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { - t.Fatalf("unexpected number of samples: %d", len(vals)) + t.Errorf("unexpected number of samples: %d", len(vals)) + } + exists, err := chunkFileExists(fpList[2]) + if err != nil { + t.Fatal(err) + } + if !exists { + t.Errorf("chunk file does not exist for fp=%v", fpList[2]) } s.DropMetricsForFingerprints(fpList...) @@ -495,16 +521,23 @@ func TestDropMetrics(t *testing.T) { Name: model.MetricNameLabel, Value: "test", }) if len(fps3) != 0 { - t.Fatalf("unexpected number of fingerprints: %d", len(fps3)) + t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } it = s.NewIterator(fpList[0]) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { - t.Fatalf("unexpected number of samples: %d", len(vals)) + t.Errorf("unexpected number of samples: %d", len(vals)) } it = s.NewIterator(fpList[1]) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { - t.Fatalf("unexpected number of samples: %d", len(vals)) + t.Errorf("unexpected number of samples: %d", len(vals)) + } + exists, err = chunkFileExists(fpList[2]) + if err != nil { + t.Fatal(err) + } + if exists { + t.Errorf("chunk file still exists for fp=%v", fpList[2]) } } @@ -533,7 +566,7 @@ func TestLoop(t *testing.T) { } storage := NewMemorySeriesStorage(o) if err := storage.Start(); err != nil { - t.Fatalf("Error starting storage: %s", err) + t.Errorf("Error starting storage: %s", err) } for _, s := range samples { storage.Append(s)