Hacky implementation of protobuf parsing

This "brings back" protobuf parsing, with the only goal to play with
the new sparse histograms.

The Prom-2.x style parser is highly adapted to the structure of the
Prometheus text format (and later OpenMetrics). Some jumping through
hoops is required to feed protobuf into it.

This is not meant to be a model for the final implementation. It
should just enable sparse histogram ingestion at a reasonable
efficiency.

Following known shortcomings and flaws:

- No tests yet.

- Summaries and legacy histograms, i.e. without sparse buckets, are
  ignored.

- Staleness doesn't work (but this could be fixed in the appender, to
  be discussed).

- No tricks have been tried that would be similar to the tricks the
  text parsers do (like direct pointers into the HTTP response
  body). That makes things weird here. Tricky optimizations only make
  sense once the final format is specified, which will almost
  certainly not be the old protobuf format. (Interestingly, I expect
  this implementation to be in fact much more efficient than the
  original protobuf ingestion in Prom-1.x.)

- This is using a proto3 version of metrics.proto (mostly to be
  consistent with the other protobuf uses). However, proto3 sees no
  difference between an unset field. We depend on that to distinguish
  between an unset timestamp and the timestamp 0 (1970-01-01, 00:00:00
  UTC). In this experimental code, we just assume that timestamp is
  never specified and therefore a timestamp of 0 always is interpreted
  as "not set".

Signed-off-by: beorn7 <beorn@grafana.com>
pull/9027/head
beorn7 2021-06-29 23:45:23 +02:00
parent f4d3af73f0
commit 5de2df752f
6 changed files with 380 additions and 25 deletions

View File

@ -17,16 +17,22 @@ import (
"mime"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/histogram"
"github.com/prometheus/prometheus/pkg/labels"
)
// Parser parses samples from a byte slice of samples in the official
// Prometheus and OpenMetrics text exposition formats.
type Parser interface {
// Series returns the bytes of the series, the timestamp if set, and the value
// of the current sample.
// Series returns the bytes of a series with a simple float64 as a
// value, the timestamp if set, and the value of the current sample.
Series() ([]byte, *int64, float64)
// Histogram returns the bytes of a series with a sparse histogram as a
// value, the timestamp if set, and the sparse histogram in the current
// sample.
Histogram() ([]byte, *int64, histogram.SparseHistogram)
// Help returns the metric name and help text in the current entry.
// Must only be called after Next returned a help entry.
// The returned byte slices become invalid after the next call to Next.
@ -63,22 +69,30 @@ type Parser interface {
// New returns a new parser of the byte slice.
func New(b []byte, contentType string) Parser {
mediaType, _, err := mime.ParseMediaType(contentType)
if err == nil && mediaType == "application/openmetrics-text" {
return NewOpenMetricsParser(b)
if err != nil {
return NewPromParser(b)
}
switch mediaType {
case "application/openmetrics-text":
return NewOpenMetricsParser(b)
case "application/vnd.google.protobuf":
return NewProtobufParser(b)
default:
return NewPromParser(b)
}
return NewPromParser(b)
}
// Entry represents the type of a parsed entry.
type Entry int
const (
EntryInvalid Entry = -1
EntryType Entry = 0
EntryHelp Entry = 1
EntrySeries Entry = 2
EntryComment Entry = 3
EntryUnit Entry = 4
EntryInvalid Entry = -1
EntryType Entry = 0
EntryHelp Entry = 1
EntrySeries Entry = 2 // A series with a simple float64 as value.
EntryComment Entry = 3
EntryUnit Entry = 4
EntryHistogram Entry = 5 // A series with a sparse histogram as a value.
)
// MetricType represents metric type values.

View File

@ -28,6 +28,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/histogram"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/value"
)
@ -113,6 +114,12 @@ func (p *OpenMetricsParser) Series() ([]byte, *int64, float64) {
return p.series, nil, p.val
}
// Histogram always returns (nil, nil, SparseHistogram{}) because OpenMetrics
// does not support sparse histograms.
func (p *OpenMetricsParser) Histogram() ([]byte, *int64, histogram.SparseHistogram) {
return nil, nil, histogram.SparseHistogram{}
}
// Help returns the metric name and help text in the current entry.
// Must only be called after Next returned a help entry.
// The returned byte slices become invalid after the next call to Next.

View File

@ -29,6 +29,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/histogram"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/value"
)
@ -168,6 +169,12 @@ func (p *PromParser) Series() ([]byte, *int64, float64) {
return p.series, nil, p.val
}
// Histogram always returns (nil, nil, SparseHistogram{}) because the Prometheus
// text format does not support sparse histograms.
func (p *PromParser) Histogram() ([]byte, *int64, histogram.SparseHistogram) {
return nil, nil, histogram.SparseHistogram{}
}
// Help returns the metric name and help text in the current entry.
// Must only be called after Next returned a help entry.
// The returned byte slices become invalid after the next call to Next.

View File

@ -0,0 +1,311 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package textparse
import (
"bytes"
"encoding/binary"
"io"
"sort"
"unicode/utf8"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/histogram"
"github.com/prometheus/prometheus/pkg/labels"
dto "github.com/prometheus/prometheus/prompb/io/prometheus/client"
)
// ProtobufParser is a very inefficient way of unmarshaling the old Prometheus
// protobuf format and then present it as it if were parsed by a
// Prometheus-2-style text parser. This is only done so that we can easily plug
// in the protobuf format into Prometheus 2. For future use (with the final
// format that will be used for sparse histograms), we have to revisit the
// parsing. A lot of the efficiency tricks of the Prometheus-2-style parsing
// could be used in a similar fashion (byte-slice pointers into the raw
// payload), which requires some hand-coded protobuf handling. But the current
// parsers all expect the full series name (metric name plus label pairs) as one
// string, which is not how things are represented in the protobuf format. If
// the re-arrangement work is actually causing problems (which has to be seen),
// that expectation needs to be changed.
//
// TODO(beorn7): The parser currently ignores summaries and legacy histograms
// (those without sparse buckets) to keep things simple.
type ProtobufParser struct {
in []byte // The intput to parse.
inPos int // Position within the input.
state Entry // State is marked by the entry we are
// processing. EntryInvalid implies that we have to
// decode the next MetricFamily.
metricPos int // Position within Metric slice.
mf *dto.MetricFamily
// The following are just shenanigans to satisfy the Parser interface.
metricBytes *bytes.Buffer // A somewhat fluid representation of the current metric.
}
func NewProtobufParser(b []byte) Parser {
return &ProtobufParser{
in: b,
state: EntryInvalid,
mf: &dto.MetricFamily{},
metricBytes: &bytes.Buffer{},
}
}
// Series returns the bytes of a series with a simple float64 as a
// value, the timestamp if set, and the value of the current sample.
func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
var (
m = p.mf.GetMetric()[p.metricPos]
ts = m.GetTimestampMs()
v float64
)
switch p.mf.GetType() {
case dto.MetricType_COUNTER:
v = m.GetCounter().Value
case dto.MetricType_GAUGE:
v = m.GetGauge().Value
case dto.MetricType_UNTYPED:
v = m.GetUntyped().Value
default:
panic("encountered unexpected metric type, this is a bug")
}
if ts != 0 {
return p.metricBytes.Bytes(), &ts, v
}
// Nasty hack: Assume that ts==0 means no timestamp. That's not true in
// general, but proto3 has no distinction between unset and
// default. Need to avoid in the final format.
return p.metricBytes.Bytes(), nil, v
}
// Histogram returns the bytes of a series with a sparse histogram as a
// value, the timestamp if set, and the sparse histogram in the current
// sample.
func (p *ProtobufParser) Histogram() ([]byte, *int64, histogram.SparseHistogram) {
var (
m = p.mf.GetMetric()[p.metricPos]
ts = m.GetTimestampMs()
h = m.GetHistogram()
)
sh := histogram.SparseHistogram{
Count: h.GetSampleCount(),
Sum: h.GetSampleSum(),
ZeroThreshold: h.GetSbZeroThreshold(),
ZeroCount: h.GetSbZeroCount(),
Schema: h.GetSbSchema(),
PositiveSpans: make([]histogram.Span, len(h.GetSbPositive().GetSpan())),
PositiveBuckets: h.GetSbPositive().GetDelta(),
NegativeSpans: make([]histogram.Span, len(h.GetSbNegative().GetSpan())),
NegativeBuckets: h.GetSbNegative().GetDelta(),
}
for i, span := range h.GetSbPositive().GetSpan() {
sh.PositiveSpans[i].Offset = span.GetOffset()
sh.PositiveSpans[i].Length = span.GetLength()
}
for i, span := range h.GetSbNegative().GetSpan() {
sh.NegativeSpans[i].Offset = span.GetOffset()
sh.NegativeSpans[i].Length = span.GetLength()
}
if ts != 0 {
return p.metricBytes.Bytes(), &ts, sh
}
// Nasty hack: Assume that ts==0 means no timestamp. That's not true in
// general, but proto3 has no distinction between unset and
// default. Need to avoid in the final format.
return p.metricBytes.Bytes(), nil, sh
}
// Help returns the metric name and help text in the current entry.
// Must only be called after Next returned a help entry.
// The returned byte slices become invalid after the next call to Next.
func (p *ProtobufParser) Help() ([]byte, []byte) {
return p.metricBytes.Bytes(), []byte(p.mf.GetHelp())
}
// Type returns the metric name and type in the current entry.
// Must only be called after Next returned a type entry.
// The returned byte slices become invalid after the next call to Next.
func (p *ProtobufParser) Type() ([]byte, MetricType) {
n := p.metricBytes.Bytes()
switch p.mf.GetType() {
case dto.MetricType_COUNTER:
return n, MetricTypeCounter
case dto.MetricType_GAUGE:
return n, MetricTypeGauge
case dto.MetricType_HISTOGRAM:
return n, MetricTypeGaugeHistogram
}
return n, MetricTypeUnknown
}
// Unit always returns (nil, nil) because units aren't supported by the protobuf
// format.
func (p *ProtobufParser) Unit() ([]byte, []byte) {
return nil, nil
}
// Comment always returns nil because comments aren't supported by the protobuf
// format.
func (p *ProtobufParser) Comment() []byte {
return nil
}
// Metric writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
func (p *ProtobufParser) Metric(l *labels.Labels) string {
*l = append(*l, labels.Label{
Name: labels.MetricName,
Value: p.mf.GetName(),
})
for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
*l = append(*l, labels.Label{
Name: lp.GetName(),
Value: lp.GetValue(),
})
}
// Sort labels to maintain the sorted labels invariant.
sort.Sort(*l)
return p.metricBytes.String()
}
// Exemplar always returns false because exemplars aren't supported yet by the
// protobuf format.
func (p *ProtobufParser) Exemplar(l *exemplar.Exemplar) bool {
return false
}
// Next advances the parser to the next "sample" (emulating the behavior of a
// text format parser). It returns (EntryInvalid, io.EOF) if no samples were
// read.
func (p *ProtobufParser) Next() (Entry, error) {
switch p.state {
case EntryInvalid:
p.metricPos = 0
n, err := readDelimited(p.in[p.inPos:], p.mf)
p.inPos += n
if err != nil {
return p.state, err
}
// Skip empty metric families. While checking for emptiness, ignore
// summaries and legacy histograms for now.
metricFound := false
metricType := p.mf.GetType()
for _, m := range p.mf.GetMetric() {
if metricType == dto.MetricType_COUNTER ||
metricType == dto.MetricType_GAUGE ||
metricType == dto.MetricType_UNTYPED ||
(metricType == dto.MetricType_HISTOGRAM &&
// A histogram with a non-zero SbZerothreshold
// is a sparse histogram.
m.GetHistogram().GetSbZeroThreshold() != 0) {
metricFound = true
break
}
}
if !metricFound {
return p.Next()
}
// We are at the beginning of a metric family. Put only the name
// into metricBytes and validate only name and help for now.
name := p.mf.GetName()
if !model.IsValidMetricName(model.LabelValue(name)) {
return EntryInvalid, errors.Errorf("invalid metric name: %s", name)
}
if help := p.mf.GetHelp(); !utf8.ValidString(help) {
return EntryInvalid, errors.Errorf("invalid help for metric %q: %s", name, help)
}
p.metricBytes.Reset()
p.metricBytes.WriteString(name)
p.state = EntryHelp
case EntryHelp:
p.state = EntryType
case EntryType:
if p.mf.GetType() == dto.MetricType_HISTOGRAM {
p.state = EntryHistogram
} else {
p.state = EntrySeries
}
if err := p.updateMetricBytes(); err != nil {
return EntryInvalid, err
}
case EntryHistogram, EntrySeries:
p.metricPos++
if p.metricPos >= len(p.mf.GetMetric()) {
p.state = EntryInvalid
return p.Next()
}
if err := p.updateMetricBytes(); err != nil {
return EntryInvalid, err
}
default:
return EntryInvalid, errors.Errorf("invalid protobuf parsing state: %d", p.state)
}
return p.state, nil
}
func (p *ProtobufParser) updateMetricBytes() error {
b := p.metricBytes
b.Reset()
b.WriteString(p.mf.GetName())
for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
b.WriteByte(model.SeparatorByte)
n := lp.GetName()
if !model.LabelName(n).IsValid() {
return errors.Errorf("invalid label name: %s", n)
}
b.WriteString(n)
b.WriteByte(model.SeparatorByte)
v := lp.GetValue()
if !utf8.ValidString(v) {
return errors.Errorf("invalid label value: %s", v)
}
b.WriteString(v)
}
return nil
}
var errInvalidVarint = errors.New("protobufparse: invalid varint encountered")
// readDelimited is essentially doing what the function of the same name in
// github.com/matttproud/golang_protobuf_extensions/pbutil is doing, but it is
// specific to a MetricFamily, utilizes the more efficient gogo-protobuf
// unmarshaling, and acts on a byte slice directly without any additional
// staging buffers.
func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) {
if len(b) == 0 {
return 0, io.EOF
}
messageLength, varIntLength := proto.DecodeVarint(b)
if varIntLength == 0 || varIntLength > binary.MaxVarintLen32 {
return 0, errInvalidVarint
}
totalLength := varIntLength + int(messageLength)
if totalLength > len(b) {
return 0, errors.Errorf("protobufparse: insufficient length of buffer, expected at least %d bytes, got %d bytes", totalLength, len(b))
}
mf.Reset()
return totalLength, mf.Unmarshal(b[varIntLength:totalLength])
}

View File

@ -40,6 +40,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/histogram"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/pool"
"github.com/prometheus/prometheus/pkg/relabel"
@ -706,7 +707,7 @@ type targetScraper struct {
var errBodySizeLimit = errors.New("body size limit exceeded")
const acceptHeader = `application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1`
const acceptHeader = `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited,application/openmetrics-text; version=0.0.1;q=0.5,text/plain;version=0.0.4;q=0.2,*/*;q=0.1`
var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version)
@ -1378,9 +1379,13 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
loop:
for {
var (
et textparse.Entry
sampleAdded bool
e exemplar.Exemplar
et textparse.Entry
sampleAdded, isHistogram bool
met []byte
parsedTimestamp *int64
val float64
his histogram.SparseHistogram
e exemplar.Exemplar
)
if et, err = p.Next(); err != nil {
if err == io.EOF {
@ -1400,17 +1405,23 @@ loop:
continue
case textparse.EntryComment:
continue
case textparse.EntryHistogram:
isHistogram = true
default:
}
total++
t := defTime
met, tp, v := p.Series()
if !sl.honorTimestamps {
tp = nil
if isHistogram {
met, parsedTimestamp, his = p.Histogram()
} else {
met, parsedTimestamp, val = p.Series()
}
if tp != nil {
t = *tp
if !sl.honorTimestamps {
parsedTimestamp = nil
}
if parsedTimestamp != nil {
t = *parsedTimestamp
}
if sl.cache.getDropped(yoloString(met)) {
@ -1453,8 +1464,12 @@ loop:
}
}
ref, err = app.Append(ref, lset, t, v)
sampleAdded, err = sl.checkAddError(ce, met, tp, err, &sampleLimitErr, &appErrs)
if isHistogram {
ref, err = app.AppendHistogram(ref, lset, t, his)
} else {
ref, err = app.Append(ref, lset, t, val)
}
sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &appErrs)
if err != nil {
if err != storage.ErrNotFound {
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
@ -1463,7 +1478,7 @@ loop:
}
if !ok {
if tp == nil {
if parsedTimestamp == nil {
// Bypass staleness logic if there is an explicit timestamp.
sl.cache.trackStaleness(hash, lset)
}
@ -1512,6 +1527,7 @@ loop:
if err == nil {
sl.cache.forEachStale(func(lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
// TODO(beorn7): Appending staleness markers breaks horribly for histograms.
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
switch errors.Cause(err) {
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:

View File

@ -1831,8 +1831,8 @@ func TestTargetScraperScrapeOK(t *testing.T) {
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
accept := r.Header.Get("Accept")
if !strings.HasPrefix(accept, "application/openmetrics-text;") {
t.Errorf("Expected Accept header to prefer application/openmetrics-text, got %q", accept)
if !strings.HasPrefix(accept, "application/vnd.google.protobuf;") {
t.Errorf("Expected Accept header to prefer application/vnd.google.protobuf, got %q", accept)
}
timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds")