protobufparse: Add support for remaining types

Parser now supports summaries and legacy histograms including
exemplars.

It also adds the option of specifying exemplars together with a sparse
histogram by simply using the legacy bucket section, too. The buckets
will be ignored, but the exemplars will be ingested.

Signed-off-by: beorn7 <beorn@grafana.com>
pull/9086/head
beorn7 2021-07-13 20:01:44 +02:00
parent 88a6229fc4
commit 641c3ae199
2 changed files with 296 additions and 56 deletions

View File

@ -16,8 +16,11 @@ package textparse
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt"
"io" "io"
"math"
"sort" "sort"
"strings"
"unicode/utf8" "unicode/utf8"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
@ -43,16 +46,19 @@ import (
// string, which is not how things are represented in the protobuf format. If // string, which is not how things are represented in the protobuf format. If
// the re-arrangement work is actually causing problems (which has to be seen), // the re-arrangement work is actually causing problems (which has to be seen),
// that expectation needs to be changed. // that expectation needs to be changed.
//
// TODO(beorn7): The parser currently ignores summaries and legacy histograms
// (those without sparse buckets) to keep things simple.
type ProtobufParser struct { type ProtobufParser struct {
in []byte // The intput to parse. in []byte // The intput to parse.
inPos int // Position within the input. inPos int // Position within the input.
state Entry // State is marked by the entry we are
// processing. EntryInvalid implies that we have to
// decode the next MetricFamily.
metricPos int // Position within Metric slice. metricPos int // Position within Metric slice.
// fieldPos is the position within a Summary or (legacy) Histogram. -2
// is the count. -1 is the sum. Otherwise it is the index within
// quantiles/buckets.
fieldPos int
fieldsDone bool // true if no more fields of a Summary or (legacy) Histogram to be processed.
// state is marked by the entry we are processing. EntryInvalid implies
// that we have to decode the next MetricFamily.
state Entry
mf *dto.MetricFamily mf *dto.MetricFamily
// The following are just shenanigans to satisfy the Parser interface. // The following are just shenanigans to satisfy the Parser interface.
@ -83,6 +89,32 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) {
v = m.GetGauge().GetValue() v = m.GetGauge().GetValue()
case dto.MetricType_UNTYPED: case dto.MetricType_UNTYPED:
v = m.GetUntyped().GetValue() v = m.GetUntyped().GetValue()
case dto.MetricType_SUMMARY:
s := m.GetSummary()
switch p.fieldPos {
case -2:
v = float64(s.GetSampleCount())
case -1:
v = s.GetSampleSum()
default:
v = s.GetQuantile()[p.fieldPos].GetValue()
}
case dto.MetricType_HISTOGRAM:
// This should only happen for a legacy histogram.
h := m.GetHistogram()
switch p.fieldPos {
case -2:
v = float64(h.GetSampleCount())
case -1:
v = h.GetSampleSum()
default:
bb := h.GetBucket()
if p.fieldPos >= len(bb) {
v = float64(h.GetSampleCount())
} else {
v = float64(bb[p.fieldPos].GetCumulativeCount())
}
}
default: default:
panic("encountered unexpected metric type, this is a bug") panic("encountered unexpected metric type, this is a bug")
} }
@ -151,6 +183,8 @@ func (p *ProtobufParser) Type() ([]byte, MetricType) {
return n, MetricTypeGauge return n, MetricTypeGauge
case dto.MetricType_HISTOGRAM: case dto.MetricType_HISTOGRAM:
return n, MetricTypeHistogram return n, MetricTypeHistogram
case dto.MetricType_SUMMARY:
return n, MetricTypeSummary
} }
return n, MetricTypeUnknown return n, MetricTypeUnknown
} }
@ -172,7 +206,7 @@ func (p *ProtobufParser) Comment() []byte {
func (p *ProtobufParser) Metric(l *labels.Labels) string { func (p *ProtobufParser) Metric(l *labels.Labels) string {
*l = append(*l, labels.Label{ *l = append(*l, labels.Label{
Name: labels.MetricName, Name: labels.MetricName,
Value: p.mf.GetName(), Value: p.getMagicName(),
}) })
for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() { for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
@ -181,6 +215,9 @@ func (p *ProtobufParser) Metric(l *labels.Labels) string {
Value: lp.GetValue(), Value: lp.GetValue(),
}) })
} }
if needed, name, value := p.getMagicLabel(); needed {
*l = append(*l, labels.Label{Name: name, Value: value})
}
// Sort labels to maintain the sorted labels invariant. // Sort labels to maintain the sorted labels invariant.
sort.Sort(*l) sort.Sort(*l)
@ -189,7 +226,9 @@ func (p *ProtobufParser) Metric(l *labels.Labels) string {
} }
// Exemplar writes the exemplar of the current sample into the passed // Exemplar writes the exemplar of the current sample into the passed
// exemplar. It returns if an exemplar exists or not. // exemplar. It returns if an exemplar exists or not. In case of a sparse
// histogram, the legacy bucket section is still used for exemplars. To ingest
// all examplars, call the Exemplar method repeatedly until it returns false.
func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool { func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
m := p.mf.GetMetric()[p.metricPos] m := p.mf.GetMetric()[p.metricPos]
var exProto *dto.Exemplar var exProto *dto.Exemplar
@ -197,8 +236,23 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
case dto.MetricType_COUNTER: case dto.MetricType_COUNTER:
exProto = m.GetCounter().GetExemplar() exProto = m.GetCounter().GetExemplar()
case dto.MetricType_HISTOGRAM: case dto.MetricType_HISTOGRAM:
// TODO(beorn7): Have to pick bucket. Forward iterator in case of sparse histogram. bb := m.GetHistogram().GetBucket()
return false if p.fieldPos < 0 {
if p.state == EntrySeries {
return false // At _count or _sum.
}
p.fieldPos = 0 // Start at 1st bucket for sparse histograms.
}
for p.fieldPos < len(bb) {
exProto = bb[p.fieldPos].GetExemplar()
if p.state == EntrySeries {
break
}
p.fieldPos++
if exProto != nil {
break
}
}
default: default:
return false return false
} }
@ -226,29 +280,15 @@ func (p *ProtobufParser) Next() (Entry, error) {
switch p.state { switch p.state {
case EntryInvalid: case EntryInvalid:
p.metricPos = 0 p.metricPos = 0
p.fieldPos = -2
n, err := readDelimited(p.in[p.inPos:], p.mf) n, err := readDelimited(p.in[p.inPos:], p.mf)
p.inPos += n p.inPos += n
if err != nil { if err != nil {
return p.state, err return p.state, err
} }
// Skip empty metric families. While checking for emptiness, ignore // Skip empty metric families.
// summaries and legacy histograms for now. if len(p.mf.GetMetric()) == 0 {
metricFound := false
metricType := p.mf.GetType()
for _, m := range p.mf.GetMetric() {
if metricType == dto.MetricType_COUNTER ||
metricType == dto.MetricType_GAUGE ||
metricType == dto.MetricType_UNTYPED ||
(metricType == dto.MetricType_HISTOGRAM &&
// A histogram with a non-zero SbZerothreshold
// is a sparse histogram.
m.GetHistogram().GetSbZeroThreshold() != 0) {
metricFound = true
break
}
}
if !metricFound {
return p.Next() return p.Next()
} }
@ -268,7 +308,8 @@ func (p *ProtobufParser) Next() (Entry, error) {
case EntryHelp: case EntryHelp:
p.state = EntryType p.state = EntryType
case EntryType: case EntryType:
if p.mf.GetType() == dto.MetricType_HISTOGRAM { if p.mf.GetType() == dto.MetricType_HISTOGRAM &&
p.mf.GetMetric()[0].GetHistogram().GetSbZeroThreshold() != 0 {
p.state = EntryHistogram p.state = EntryHistogram
} else { } else {
p.state = EntrySeries p.state = EntrySeries
@ -277,7 +318,14 @@ func (p *ProtobufParser) Next() (Entry, error) {
return EntryInvalid, err return EntryInvalid, err
} }
case EntryHistogram, EntrySeries: case EntryHistogram, EntrySeries:
if p.state == EntrySeries && !p.fieldsDone &&
(p.mf.GetType() == dto.MetricType_SUMMARY || p.mf.GetType() == dto.MetricType_HISTOGRAM) {
p.fieldPos++
} else {
p.metricPos++ p.metricPos++
p.fieldPos = -2
p.fieldsDone = false
}
if p.metricPos >= len(p.mf.GetMetric()) { if p.metricPos >= len(p.mf.GetMetric()) {
p.state = EntryInvalid p.state = EntryInvalid
return p.Next() return p.Next()
@ -294,7 +342,7 @@ func (p *ProtobufParser) Next() (Entry, error) {
func (p *ProtobufParser) updateMetricBytes() error { func (p *ProtobufParser) updateMetricBytes() error {
b := p.metricBytes b := p.metricBytes
b.Reset() b.Reset()
b.WriteString(p.mf.GetName()) b.WriteString(p.getMagicName())
for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() { for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() {
b.WriteByte(model.SeparatorByte) b.WriteByte(model.SeparatorByte)
n := lp.GetName() n := lp.GetName()
@ -309,9 +357,60 @@ func (p *ProtobufParser) updateMetricBytes() error {
} }
b.WriteString(v) b.WriteString(v)
} }
if needed, n, v := p.getMagicLabel(); needed {
b.WriteByte(model.SeparatorByte)
b.WriteString(n)
b.WriteByte(model.SeparatorByte)
b.WriteString(v)
}
return nil return nil
} }
// getMagicName usually just returns p.mf.GetType() but adds a magic suffix
// ("_count", "_sum", "_bucket") if needed according to the current parser
// state.
func (p *ProtobufParser) getMagicName() string {
t := p.mf.GetType()
if p.state == EntryHistogram || (t != dto.MetricType_HISTOGRAM && t != dto.MetricType_SUMMARY) {
return p.mf.GetName()
}
if p.fieldPos == -2 {
return p.mf.GetName() + "_count"
}
if p.fieldPos == -1 {
return p.mf.GetName() + "_sum"
}
if t == dto.MetricType_HISTOGRAM {
return p.mf.GetName() + "_bucket"
}
return p.mf.GetName()
}
// getMagicLabel returns if a magic label ("quantile" or "le") is needed and, if
// so, its name and value. It also sets p.fieldsDone if applicable.
func (p *ProtobufParser) getMagicLabel() (bool, string, string) {
if p.state == EntryHistogram || p.fieldPos < 0 {
return false, "", ""
}
switch p.mf.GetType() {
case dto.MetricType_SUMMARY:
qq := p.mf.GetMetric()[p.metricPos].GetSummary().GetQuantile()
q := qq[p.fieldPos]
p.fieldsDone = p.fieldPos == len(qq)-1
return true, model.QuantileLabel, formatOpenMetricsFloat(q.GetQuantile())
case dto.MetricType_HISTOGRAM:
bb := p.mf.GetMetric()[p.metricPos].GetHistogram().GetBucket()
if p.fieldPos >= len(bb) {
p.fieldsDone = true
return true, model.BucketLabel, "+Inf"
}
b := bb[p.fieldPos]
p.fieldsDone = math.IsInf(b.GetUpperBound(), +1)
return true, model.BucketLabel, formatOpenMetricsFloat(b.GetUpperBound())
}
return false, "", ""
}
var errInvalidVarint = errors.New("protobufparse: invalid varint encountered") var errInvalidVarint = errors.New("protobufparse: invalid varint encountered")
// readDelimited is essentially doing what the function of the same name in // readDelimited is essentially doing what the function of the same name in
@ -334,3 +433,29 @@ func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) {
mf.Reset() mf.Reset()
return totalLength, mf.Unmarshal(b[varIntLength:totalLength]) return totalLength, mf.Unmarshal(b[varIntLength:totalLength])
} }
// formatOpenMetricsFloat works like the usual Go string formatting of a fleat
// but appends ".0" if the resulting number would otherwise contain neither a
// "." nor an "e".
func formatOpenMetricsFloat(f float64) string {
// A few common cases hardcoded.
switch {
case f == 1:
return "1.0"
case f == 0:
return "0.0"
case f == -1:
return "-1.0"
case math.IsNaN(f):
return "NaN"
case math.IsInf(f, +1):
return "+Inf"
case math.IsInf(f, -1):
return "-Inf"
}
s := fmt.Sprint(f)
if strings.ContainsAny(s, "e.") {
return s
}
return s + ".0"
}

View File

@ -104,7 +104,7 @@ metric: <
name: "dummyID" name: "dummyID"
value: "59727" value: "59727"
> >
value: -0.0003919818421972943 value: -0.00039
timestamp: < timestamp: <
seconds: 1625851155 seconds: 1625851155
nanos: 146848499 nanos: 146848499
@ -119,11 +119,7 @@ metric: <
name: "dummyID" name: "dummyID"
value: "5617" value: "5617"
> >
value: -0.0002956962622126468 value: -0.00029
timestamp: <
seconds: 1625851150
nanos: 233181498
>
> >
> >
sb_schema: 3 sb_schema: 3
@ -164,44 +160,40 @@ metric: <
`, `,
`name: "test_histogram2" `name: "test_histogram2"
help: "Same histogram as before but now without sparse buckets." help: "Similar histogram as before but now without sparse buckets."
type: HISTOGRAM type: HISTOGRAM
metric: < metric: <
histogram: < histogram: <
sample_count: 175 sample_count: 175
sample_sum: 0.0008280461746287094 sample_sum: 0.000828
bucket: < bucket: <
cumulative_count: 2 cumulative_count: 2
upper_bound: -0.0004899999999999998 upper_bound: -0.00048
> >
bucket: < bucket: <
cumulative_count: 4 cumulative_count: 4
upper_bound: -0.0003899999999999998 upper_bound: -0.00038
exemplar: < exemplar: <
label: < label: <
name: "dummyID" name: "dummyID"
value: "59727" value: "59727"
> >
value: -0.0003919818421972943 value: -0.00038
timestamp: < timestamp: <
seconds: 1625851155 seconds: 1625851153
nanos: 146848499 nanos: 146848499
> >
> >
> >
bucket: < bucket: <
cumulative_count: 16 cumulative_count: 16
upper_bound: -0.0002899999999999998 upper_bound: 1
exemplar: < exemplar: <
label: < label: <
name: "dummyID" name: "dummyID"
value: "5617" value: "5617"
> >
value: -0.0002956962622126468 value: -0.000295
timestamp: <
seconds: 1625851150
nanos: 233181498
>
> >
> >
sb_schema: 0 sb_schema: 0
@ -265,7 +257,7 @@ metric: <
unit string unit string
comment string comment string
shs histogram.SparseHistogram shs histogram.SparseHistogram
e *exemplar.Exemplar e []exemplar.Exemplar
}{ }{
{ {
m: "go_build_info", m: "go_build_info",
@ -299,7 +291,9 @@ metric: <
lset: labels.FromStrings( lset: labels.FromStrings(
"__name__", "go_memstats_alloc_bytes_total", "__name__", "go_memstats_alloc_bytes_total",
), ),
e: &exemplar.Exemplar{Labels: labels.FromStrings("dummyID", "42"), Value: 12, HasTs: true, Ts: 1625851151233}, e: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "42"), Value: 12, HasTs: true, Ts: 1625851151233},
},
}, },
{ {
m: "something_untyped", m: "something_untyped",
@ -348,6 +342,121 @@ metric: <
lset: labels.FromStrings( lset: labels.FromStrings(
"__name__", "test_histogram", "__name__", "test_histogram",
), ),
e: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146},
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false},
},
},
{
m: "test_histogram2",
help: "Similar histogram as before but now without sparse buckets.",
},
{
m: "test_histogram2",
typ: MetricTypeHistogram,
},
{
m: "test_histogram2_count",
v: 175,
lset: labels.FromStrings(
"__name__", "test_histogram2_count",
),
},
{
m: "test_histogram2_sum",
v: 0.000828,
lset: labels.FromStrings(
"__name__", "test_histogram2_sum",
),
},
{
m: "test_histogram2_bucket\xffle\xff-0.00048",
v: 2,
lset: labels.FromStrings(
"__name__", "test_histogram2_bucket",
"le", "-0.00048",
),
},
{
m: "test_histogram2_bucket\xffle\xff-0.00038",
v: 4,
lset: labels.FromStrings(
"__name__", "test_histogram2_bucket",
"le", "-0.00038",
),
e: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00038, HasTs: true, Ts: 1625851153146},
},
},
{
m: "test_histogram2_bucket\xffle\xff1.0",
v: 16,
lset: labels.FromStrings(
"__name__", "test_histogram2_bucket",
"le", "1.0",
),
e: []exemplar.Exemplar{
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.000295, HasTs: false},
},
},
{
m: "test_histogram2_bucket\xffle\xff+Inf",
v: 175,
lset: labels.FromStrings(
"__name__", "test_histogram2_bucket",
"le", "+Inf",
),
},
{
m: "rpc_durations_seconds",
help: "RPC latency distributions.",
},
{
m: "rpc_durations_seconds",
typ: MetricTypeSummary,
},
{
m: "rpc_durations_seconds_count\xffservice\xffexponential",
v: 262,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds_count",
"service", "exponential",
),
},
{
m: "rpc_durations_seconds_sum\xffservice\xffexponential",
v: 0.00025551262820703587,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds_sum",
"service", "exponential",
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.5",
v: 6.442786329648548e-07,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",
"quantile", "0.5",
"service", "exponential",
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.9",
v: 1.9435742936658396e-06,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",
"quantile", "0.9",
"service", "exponential",
),
},
{
m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.99",
v: 4.0471608667037015e-06,
lset: labels.FromStrings(
"__name__", "rpc_durations_seconds",
"quantile", "0.99",
"service", "exponential",
),
}, },
} }
@ -378,11 +487,11 @@ metric: <
} }
require.Equal(t, exp[i].v, v) require.Equal(t, exp[i].v, v)
require.Equal(t, exp[i].lset, res) require.Equal(t, exp[i].lset, res)
if exp[i].e == nil { if len(exp[i].e) == 0 {
require.Equal(t, false, found) require.Equal(t, false, found)
} else { } else {
require.Equal(t, true, found) require.Equal(t, true, found)
require.Equal(t, *exp[i].e, e) require.Equal(t, exp[i].e[0], e)
} }
res = res[:0] res = res[:0]
@ -400,6 +509,12 @@ metric: <
res = res[:0] res = res[:0]
require.Equal(t, exp[i].m, string(m)) require.Equal(t, exp[i].m, string(m))
require.Equal(t, exp[i].shs, shs) require.Equal(t, exp[i].shs, shs)
j := 0
for e := (exemplar.Exemplar{}); p.Exemplar(&e); j++ {
require.Equal(t, exp[i].e[j], e)
e = exemplar.Exemplar{}
}
require.Equal(t, len(exp[i].e), j, "not enough exemplars found")
case EntryType: case EntryType:
m, typ := p.Type() m, typ := p.Type()