From 39a82549633c3d1b499697cdac1ee758b54b6dbb Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 22 Jun 2015 22:46:55 +0200 Subject: [PATCH] web: add basic federation support. This commit adds a federation handler on /federate. It accepts `match[]` query parameters containing vector selectors. Their intersection determines the in-memory metrics that are returned in the same way as the /metrics endpoint does (modulo sorting). --- web/federate.go | 108 ++++++++++++++++++++++++++++++++++++++++++++++++ web/web.go | 9 +++- 2 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 web/federate.go diff --git a/web/federate.go b/web/federate.go new file mode 100644 index 000000000..b2db6aeae --- /dev/null +++ b/web/federate.go @@ -0,0 +1,108 @@ +package web + +import ( + "io" + "net/http" + + "bitbucket.org/ww/goautoneg" + "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/text" + + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage/local" + + clientmodel "github.com/prometheus/client_golang/model" + dto "github.com/prometheus/client_model/go" +) + +type Federation struct { + Storage local.Storage +} + +func (fed *Federation) ServeHTTP(w http.ResponseWriter, req *http.Request) { + req.ParseForm() + + metrics := map[clientmodel.Fingerprint]clientmodel.COWMetric{} + + for _, s := range req.Form["match[]"] { + matchers, err := promql.ParseMetricSelector(s) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + for fp, met := range fed.Storage.MetricsForLabelMatchers(matchers...) { + metrics[fp] = met + } + } + + enc, contentType := chooseEncoder(req) + w.Header().Set("Content-Type", contentType) + + protMetric := &dto.Metric{ + Label: []*dto.LabelPair{}, + Untyped: &dto.Untyped{}, + } + protMetricFam := &dto.MetricFamily{ + Metric: []*dto.Metric{protMetric}, + Type: dto.MetricType_UNTYPED.Enum(), + } + + for fp, met := range metrics { + sp := fed.Storage.LastSamplePairForFingerprint(fp) + if sp == nil { + continue + } + + // Reset label slice. + protMetric.Label = protMetric.Label[:0] + + for ln, lv := range met.Metric { + if ln == clientmodel.MetricNameLabel { + protMetricFam.Name = proto.String(string(lv)) + continue + } + protMetric.Label = append(protMetric.Label, &dto.LabelPair{ + Name: proto.String(string(ln)), + Value: proto.String(string(lv)), + }) + } + protMetric.TimestampMs = (*int64)(&sp.Timestamp) + protMetric.Untyped.Value = (*float64)(&sp.Value) + + if _, err := enc(w, protMetricFam); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + +type encoder func(w io.Writer, p *dto.MetricFamily) (int, error) + +func chooseEncoder(req *http.Request) (encoder, string) { + accepts := goautoneg.ParseAccept(req.Header.Get("Accept")) + for _, accept := range accepts { + switch { + case accept.Type == "application" && + accept.SubType == "vnd.google.protobuf" && + accept.Params["proto"] == "io.prometheus.client.MetricFamily": + switch accept.Params["encoding"] { + case "delimited": + return text.WriteProtoDelimited, prometheus.DelimitedTelemetryContentType + case "text": + return text.WriteProtoText, prometheus.ProtoTextTelemetryContentType + case "compact-text": + return text.WriteProtoCompactText, prometheus.ProtoCompactTextTelemetryContentType + default: + continue + } + case accept.Type == "text" && + accept.SubType == "plain" && + (accept.Params["version"] == "0.0.4" || accept.Params["version"] == ""): + return text.MetricFamilyToText, prometheus.TextTelemetryContentType + default: + continue + } + } + return text.MetricFamilyToText, prometheus.TextTelemetryContentType +} diff --git a/web/web.go b/web/web.go index 5a88be937..70136544a 100644 --- a/web/web.go +++ b/web/web.go @@ -54,8 +54,9 @@ type Handler struct { ruleManager *rules.Manager queryEngine *promql.Engine - apiV1 *v1.API - apiLegacy *legacy.API + apiV1 *v1.API + apiLegacy *legacy.API + federation *Federation router *route.Router quitCh chan struct{} @@ -125,6 +126,9 @@ func New(st local.Storage, qe *promql.Engine, rm *rules.Manager, status *Prometh Storage: st, Now: clientmodel.Now, }, + federation: &Federation{ + Storage: st, + }, } if o.PathPrefix != "" { @@ -145,6 +149,7 @@ func New(st local.Storage, qe *promql.Engine, rm *rules.Manager, status *Prometh router.Get("/heap", instrf("heap", dumpHeap)) + router.Get("/federate", instrh("federate", h.federation)) router.Get(o.MetricsPath, prometheus.Handler().ServeHTTP) h.apiLegacy.Register(router.WithPrefix("/api"))