mirror of https://github.com/prometheus/prometheus
Merge pull request #1965 from prometheus/beorn7/federation
federation: Collapse time series of the same namepull/2001/head
commit
3eef95962b
|
@ -15,10 +15,12 @@ package web
|
|||
|
||||
import (
|
||||
"net/http"
|
||||
"sort"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
|
@ -42,35 +44,56 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
|||
}
|
||||
|
||||
var (
|
||||
minTimestamp = model.Now().Add(-promql.StalenessDelta)
|
||||
minTimestamp = h.now().Add(-promql.StalenessDelta)
|
||||
format = expfmt.Negotiate(req.Header)
|
||||
enc = expfmt.NewEncoder(w, format)
|
||||
)
|
||||
w.Header().Set("Content-Type", string(format))
|
||||
|
||||
protMetric := &dto.Metric{
|
||||
Label: []*dto.LabelPair{},
|
||||
Untyped: &dto.Untyped{},
|
||||
}
|
||||
protMetricFam := &dto.MetricFamily{
|
||||
Metric: []*dto.Metric{protMetric},
|
||||
Type: dto.MetricType_UNTYPED.Enum(),
|
||||
}
|
||||
|
||||
vector, err := h.storage.LastSampleForLabelMatchers(minTimestamp, matcherSets...)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
for _, s := range vector {
|
||||
globalUsed := map[model.LabelName]struct{}{}
|
||||
sort.Sort(byName(vector))
|
||||
|
||||
// Reset label slice.
|
||||
protMetric.Label = protMetric.Label[:0]
|
||||
var (
|
||||
lastMetricName model.LabelValue
|
||||
protMetricFam *dto.MetricFamily
|
||||
)
|
||||
for _, s := range vector {
|
||||
nameSeen := false
|
||||
globalUsed := map[model.LabelName]struct{}{}
|
||||
protMetric := &dto.Metric{
|
||||
Untyped: &dto.Untyped{},
|
||||
}
|
||||
|
||||
for ln, lv := range s.Metric {
|
||||
if lv == "" {
|
||||
// No value means unset. Never consider those labels.
|
||||
// This is also important to protect against nameless metrics.
|
||||
continue
|
||||
}
|
||||
if ln == model.MetricNameLabel {
|
||||
protMetricFam.Name = proto.String(string(lv))
|
||||
nameSeen = true
|
||||
if lv == lastMetricName {
|
||||
// We already have the name in the current MetricFamily,
|
||||
// and we ignore nameless metrics.
|
||||
continue
|
||||
}
|
||||
// Need to start a new MetricFamily. Ship off the old one (if any) before
|
||||
// creating the new one.
|
||||
if protMetricFam != nil {
|
||||
if err := enc.Encode(protMetricFam); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
protMetricFam = &dto.MetricFamily{
|
||||
Type: dto.MetricType_UNTYPED.Enum(),
|
||||
Name: proto.String(string(lv)),
|
||||
}
|
||||
lastMetricName = lv
|
||||
continue
|
||||
}
|
||||
protMetric.Label = append(protMetric.Label, &dto.LabelPair{
|
||||
|
@ -81,7 +104,10 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
|||
globalUsed[ln] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
if !nameSeen {
|
||||
log.With("metric", s.Metric).Warn("Ignoring nameless metric during federation.")
|
||||
continue
|
||||
}
|
||||
// Attach global labels if they do not exist yet.
|
||||
for ln, lv := range h.externalLabels {
|
||||
if _, ok := globalUsed[ln]; !ok {
|
||||
|
@ -95,9 +121,24 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
|||
protMetric.TimestampMs = proto.Int64(int64(s.Timestamp))
|
||||
protMetric.Untyped.Value = proto.Float64(float64(s.Value))
|
||||
|
||||
protMetricFam.Metric = append(protMetricFam.Metric, protMetric)
|
||||
}
|
||||
// Still have to ship off the last MetricFamily, if any.
|
||||
if protMetricFam != nil {
|
||||
if err := enc.Encode(protMetricFam); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// byName makes a model.Vector sortable by metric name.
|
||||
type byName model.Vector
|
||||
|
||||
func (vec byName) Len() int { return len(vec) }
|
||||
func (vec byName) Swap(i, j int) { vec[i], vec[j] = vec[j], vec[i] }
|
||||
|
||||
func (vec byName) Less(i, j int) bool {
|
||||
ni := vec[i].Metric[model.MetricNameLabel]
|
||||
nj := vec[j].Metric[model.MetricNameLabel]
|
||||
return ni < nj
|
||||
}
|
||||
|
|
|
@ -0,0 +1,200 @@
|
|||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package web
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
)
|
||||
|
||||
var scenarios = map[string]struct {
|
||||
params string
|
||||
accept string
|
||||
code int
|
||||
body string
|
||||
}{
|
||||
"empty": {
|
||||
params: "",
|
||||
code: 200,
|
||||
body: ``,
|
||||
},
|
||||
"match nothing": {
|
||||
params: "match[]=does_not_match_anything",
|
||||
code: 200,
|
||||
body: ``,
|
||||
},
|
||||
"invalid params from the beginning": {
|
||||
params: "match[]=-not-a-valid-metric-name",
|
||||
code: 400,
|
||||
body: `parse error at char 1: vector selector must contain label matchers or metric name
|
||||
`,
|
||||
},
|
||||
"invalid params somehwere in the middle": {
|
||||
params: "match[]=not-a-valid-metric-name",
|
||||
code: 400,
|
||||
body: `parse error at char 4: could not parse remaining input "-a-valid-metric"...
|
||||
`,
|
||||
},
|
||||
"test_metric1": {
|
||||
params: "match[]=test_metric1",
|
||||
code: 200,
|
||||
body: `# TYPE test_metric1 untyped
|
||||
test_metric1{foo="bar"} 10000 6000000
|
||||
test_metric1{foo="boo"} 1 6000000
|
||||
`,
|
||||
},
|
||||
"test_metric2": {
|
||||
params: "match[]=test_metric2",
|
||||
code: 200,
|
||||
body: `# TYPE test_metric2 untyped
|
||||
test_metric2{foo="boo"} 1 6000000
|
||||
`,
|
||||
},
|
||||
"test_metric_without_labels": {
|
||||
params: "match[]=test_metric_without_labels",
|
||||
code: 200,
|
||||
body: `# TYPE test_metric_without_labels untyped
|
||||
test_metric_without_labels 1001 6000000
|
||||
`,
|
||||
},
|
||||
"{foo='boo'}": {
|
||||
params: "match[]={foo='boo'}",
|
||||
code: 200,
|
||||
body: `# TYPE test_metric1 untyped
|
||||
test_metric1{foo="boo"} 1 6000000
|
||||
# TYPE test_metric2 untyped
|
||||
test_metric2{foo="boo"} 1 6000000
|
||||
`,
|
||||
},
|
||||
"two matchers": {
|
||||
params: "match[]=test_metric1&match[]=test_metric2",
|
||||
code: 200,
|
||||
body: `# TYPE test_metric1 untyped
|
||||
test_metric1{foo="bar"} 10000 6000000
|
||||
test_metric1{foo="boo"} 1 6000000
|
||||
# TYPE test_metric2 untyped
|
||||
test_metric2{foo="boo"} 1 6000000
|
||||
`,
|
||||
},
|
||||
"everything": {
|
||||
params: "match[]={__name__=~'.%2b'}", // '%2b' is an URL-encoded '+'.
|
||||
code: 200,
|
||||
body: `# TYPE test_metric1 untyped
|
||||
test_metric1{foo="bar"} 10000 6000000
|
||||
test_metric1{foo="boo"} 1 6000000
|
||||
# TYPE test_metric2 untyped
|
||||
test_metric2{foo="boo"} 1 6000000
|
||||
# TYPE test_metric_without_labels untyped
|
||||
test_metric_without_labels 1001 6000000
|
||||
`,
|
||||
},
|
||||
"empty label value matches everything that doesn't have that label": {
|
||||
params: "match[]={foo='',__name__=~'.%2b'}",
|
||||
code: 200,
|
||||
body: `# TYPE test_metric_without_labels untyped
|
||||
test_metric_without_labels 1001 6000000
|
||||
`,
|
||||
},
|
||||
"empty label value for a label that doesn't exist at all, matches everything": {
|
||||
params: "match[]={bar='',__name__=~'.%2b'}",
|
||||
code: 200,
|
||||
body: `# TYPE test_metric1 untyped
|
||||
test_metric1{foo="bar"} 10000 6000000
|
||||
test_metric1{foo="boo"} 1 6000000
|
||||
# TYPE test_metric2 untyped
|
||||
test_metric2{foo="boo"} 1 6000000
|
||||
# TYPE test_metric_without_labels untyped
|
||||
test_metric_without_labels 1001 6000000
|
||||
`,
|
||||
},
|
||||
}
|
||||
|
||||
func TestFederation(t *testing.T) {
|
||||
suite, err := promql.NewTest(t, `
|
||||
load 1m
|
||||
test_metric1{foo="bar"} 0+100x100
|
||||
test_metric1{foo="boo"} 1+0x100
|
||||
test_metric2{foo="boo"} 1+0x100
|
||||
test_metric_without_labels 1+10x100
|
||||
`)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer suite.Close()
|
||||
|
||||
if err := suite.Run(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
h := &Handler{
|
||||
storage: suite.Storage(),
|
||||
queryEngine: suite.QueryEngine(),
|
||||
now: func() model.Time { return 101 * 60 * 1000 }, // 101min after epoch.
|
||||
}
|
||||
|
||||
for name, scenario := range scenarios {
|
||||
req, err := http.ReadRequest(bufio.NewReader(strings.NewReader(
|
||||
"GET http://example.org/federate?" + scenario.params + " HTTP/1.0\r\n\r\n",
|
||||
)))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// HTTP/1.0 was used above to avoid needing a Host field. Change it to 1.1 here.
|
||||
req.Proto = "HTTP/1.1"
|
||||
req.ProtoMinor = 1
|
||||
req.Close = false
|
||||
// 192.0.2.0/24 is "TEST-NET" in RFC 5737 for use solely in
|
||||
// documentation and example source code and should not be
|
||||
// used publicly.
|
||||
req.RemoteAddr = "192.0.2.1:1234"
|
||||
// TODO(beorn7): Once we are completely on Go1.7, replace the lines above by the following:
|
||||
// req := httptest.NewRequest("GET", "http://example.org/federate?"+scenario.params, nil)
|
||||
res := httptest.NewRecorder()
|
||||
h.federation(res, req)
|
||||
if got, want := res.Code, scenario.code; got != want {
|
||||
t.Errorf("Scenario %q: got code %d, want %d", name, got, want)
|
||||
}
|
||||
if got, want := normalizeBody(res.Body), scenario.body; got != want {
|
||||
t.Errorf("Scenario %q: got body %q, want %q", name, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// normalizeBody sorts the lines within a metric to make it easy to verify the body.
|
||||
// (Federation is not taking care of sorting within a metric family.)
|
||||
func normalizeBody(body *bytes.Buffer) string {
|
||||
var (
|
||||
lines []string
|
||||
lastHash int
|
||||
)
|
||||
for line, err := body.ReadString('\n'); err == nil; line, err = body.ReadString('\n') {
|
||||
if line[0] == '#' && len(lines) > 0 {
|
||||
sort.Strings(lines[lastHash+1:])
|
||||
lastHash = len(lines)
|
||||
}
|
||||
lines = append(lines, line)
|
||||
}
|
||||
if len(lines) > 0 {
|
||||
sort.Strings(lines[lastHash+1:])
|
||||
}
|
||||
return strings.Join(lines, "")
|
||||
}
|
|
@ -71,6 +71,7 @@ type Handler struct {
|
|||
|
||||
externalLabels model.LabelSet
|
||||
mtx sync.RWMutex
|
||||
now func() model.Time
|
||||
}
|
||||
|
||||
// ApplyConfig updates the status state as the new config requires.
|
||||
|
@ -135,6 +136,7 @@ func New(
|
|||
storage: st,
|
||||
|
||||
apiV1: api_v1.NewAPI(qe, st),
|
||||
now: model.Now,
|
||||
}
|
||||
|
||||
if o.RoutePrefix != "/" {
|
||||
|
@ -291,7 +293,7 @@ func (h *Handler) consoles(w http.ResponseWriter, r *http.Request) {
|
|||
Path: strings.TrimLeft(name, "/"),
|
||||
}
|
||||
|
||||
tmpl := template.NewTemplateExpander(string(text), "__console_"+name, data, model.Now(), h.queryEngine, h.options.ExternalURL.Path)
|
||||
tmpl := template.NewTemplateExpander(string(text), "__console_"+name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path)
|
||||
filenames, err := filepath.Glob(h.options.ConsoleLibrariesPath + "/*.lib")
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -464,7 +466,7 @@ func (h *Handler) executeTemplate(w http.ResponseWriter, name string, data inter
|
|||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
tmpl := template.NewTemplateExpander(text, name, data, model.Now(), h.queryEngine, h.options.ExternalURL.Path)
|
||||
tmpl := template.NewTemplateExpander(text, name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path)
|
||||
tmpl.Funcs(tmplFuncs(h.consolesPath(), h.options))
|
||||
|
||||
result, err := tmpl.ExpandHTML(nil)
|
||||
|
|
Loading…
Reference in New Issue