diff --git a/pkg/textparse/protobufparse.go b/pkg/textparse/protobufparse.go index 265acdf87..a80ceaddc 100644 --- a/pkg/textparse/protobufparse.go +++ b/pkg/textparse/protobufparse.go @@ -16,8 +16,11 @@ package textparse import ( "bytes" "encoding/binary" + "fmt" "io" + "math" "sort" + "strings" "unicode/utf8" "github.com/gogo/protobuf/proto" @@ -43,17 +46,20 @@ import ( // string, which is not how things are represented in the protobuf format. If // the re-arrangement work is actually causing problems (which has to be seen), // that expectation needs to be changed. -// -// TODO(beorn7): The parser currently ignores summaries and legacy histograms -// (those without sparse buckets) to keep things simple. type ProtobufParser struct { - in []byte // The intput to parse. - inPos int // Position within the input. - state Entry // State is marked by the entry we are - // processing. EntryInvalid implies that we have to - // decode the next MetricFamily. - metricPos int // Position within Metric slice. - mf *dto.MetricFamily + in []byte // The intput to parse. + inPos int // Position within the input. + metricPos int // Position within Metric slice. + // fieldPos is the position within a Summary or (legacy) Histogram. -2 + // is the count. -1 is the sum. Otherwise it is the index within + // quantiles/buckets. + fieldPos int + fieldsDone bool // true if no more fields of a Summary or (legacy) Histogram to be processed. + // state is marked by the entry we are processing. EntryInvalid implies + // that we have to decode the next MetricFamily. + state Entry + + mf *dto.MetricFamily // The following are just shenanigans to satisfy the Parser interface. metricBytes *bytes.Buffer // A somewhat fluid representation of the current metric. @@ -83,6 +89,32 @@ func (p *ProtobufParser) Series() ([]byte, *int64, float64) { v = m.GetGauge().GetValue() case dto.MetricType_UNTYPED: v = m.GetUntyped().GetValue() + case dto.MetricType_SUMMARY: + s := m.GetSummary() + switch p.fieldPos { + case -2: + v = float64(s.GetSampleCount()) + case -1: + v = s.GetSampleSum() + default: + v = s.GetQuantile()[p.fieldPos].GetValue() + } + case dto.MetricType_HISTOGRAM: + // This should only happen for a legacy histogram. + h := m.GetHistogram() + switch p.fieldPos { + case -2: + v = float64(h.GetSampleCount()) + case -1: + v = h.GetSampleSum() + default: + bb := h.GetBucket() + if p.fieldPos >= len(bb) { + v = float64(h.GetSampleCount()) + } else { + v = float64(bb[p.fieldPos].GetCumulativeCount()) + } + } default: panic("encountered unexpected metric type, this is a bug") } @@ -151,6 +183,8 @@ func (p *ProtobufParser) Type() ([]byte, MetricType) { return n, MetricTypeGauge case dto.MetricType_HISTOGRAM: return n, MetricTypeHistogram + case dto.MetricType_SUMMARY: + return n, MetricTypeSummary } return n, MetricTypeUnknown } @@ -172,7 +206,7 @@ func (p *ProtobufParser) Comment() []byte { func (p *ProtobufParser) Metric(l *labels.Labels) string { *l = append(*l, labels.Label{ Name: labels.MetricName, - Value: p.mf.GetName(), + Value: p.getMagicName(), }) for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() { @@ -181,6 +215,9 @@ func (p *ProtobufParser) Metric(l *labels.Labels) string { Value: lp.GetValue(), }) } + if needed, name, value := p.getMagicLabel(); needed { + *l = append(*l, labels.Label{Name: name, Value: value}) + } // Sort labels to maintain the sorted labels invariant. sort.Sort(*l) @@ -189,7 +226,9 @@ func (p *ProtobufParser) Metric(l *labels.Labels) string { } // Exemplar writes the exemplar of the current sample into the passed -// exemplar. It returns if an exemplar exists or not. +// exemplar. It returns if an exemplar exists or not. In case of a sparse +// histogram, the legacy bucket section is still used for exemplars. To ingest +// all examplars, call the Exemplar method repeatedly until it returns false. func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool { m := p.mf.GetMetric()[p.metricPos] var exProto *dto.Exemplar @@ -197,8 +236,23 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool { case dto.MetricType_COUNTER: exProto = m.GetCounter().GetExemplar() case dto.MetricType_HISTOGRAM: - // TODO(beorn7): Have to pick bucket. Forward iterator in case of sparse histogram. - return false + bb := m.GetHistogram().GetBucket() + if p.fieldPos < 0 { + if p.state == EntrySeries { + return false // At _count or _sum. + } + p.fieldPos = 0 // Start at 1st bucket for sparse histograms. + } + for p.fieldPos < len(bb) { + exProto = bb[p.fieldPos].GetExemplar() + if p.state == EntrySeries { + break + } + p.fieldPos++ + if exProto != nil { + break + } + } default: return false } @@ -226,29 +280,15 @@ func (p *ProtobufParser) Next() (Entry, error) { switch p.state { case EntryInvalid: p.metricPos = 0 + p.fieldPos = -2 n, err := readDelimited(p.in[p.inPos:], p.mf) p.inPos += n if err != nil { return p.state, err } - // Skip empty metric families. While checking for emptiness, ignore - // summaries and legacy histograms for now. - metricFound := false - metricType := p.mf.GetType() - for _, m := range p.mf.GetMetric() { - if metricType == dto.MetricType_COUNTER || - metricType == dto.MetricType_GAUGE || - metricType == dto.MetricType_UNTYPED || - (metricType == dto.MetricType_HISTOGRAM && - // A histogram with a non-zero SbZerothreshold - // is a sparse histogram. - m.GetHistogram().GetSbZeroThreshold() != 0) { - metricFound = true - break - } - } - if !metricFound { + // Skip empty metric families. + if len(p.mf.GetMetric()) == 0 { return p.Next() } @@ -268,7 +308,8 @@ func (p *ProtobufParser) Next() (Entry, error) { case EntryHelp: p.state = EntryType case EntryType: - if p.mf.GetType() == dto.MetricType_HISTOGRAM { + if p.mf.GetType() == dto.MetricType_HISTOGRAM && + p.mf.GetMetric()[0].GetHistogram().GetSbZeroThreshold() != 0 { p.state = EntryHistogram } else { p.state = EntrySeries @@ -277,7 +318,14 @@ func (p *ProtobufParser) Next() (Entry, error) { return EntryInvalid, err } case EntryHistogram, EntrySeries: - p.metricPos++ + if p.state == EntrySeries && !p.fieldsDone && + (p.mf.GetType() == dto.MetricType_SUMMARY || p.mf.GetType() == dto.MetricType_HISTOGRAM) { + p.fieldPos++ + } else { + p.metricPos++ + p.fieldPos = -2 + p.fieldsDone = false + } if p.metricPos >= len(p.mf.GetMetric()) { p.state = EntryInvalid return p.Next() @@ -294,7 +342,7 @@ func (p *ProtobufParser) Next() (Entry, error) { func (p *ProtobufParser) updateMetricBytes() error { b := p.metricBytes b.Reset() - b.WriteString(p.mf.GetName()) + b.WriteString(p.getMagicName()) for _, lp := range p.mf.GetMetric()[p.metricPos].GetLabel() { b.WriteByte(model.SeparatorByte) n := lp.GetName() @@ -309,9 +357,60 @@ func (p *ProtobufParser) updateMetricBytes() error { } b.WriteString(v) } + if needed, n, v := p.getMagicLabel(); needed { + b.WriteByte(model.SeparatorByte) + b.WriteString(n) + b.WriteByte(model.SeparatorByte) + b.WriteString(v) + } return nil } +// getMagicName usually just returns p.mf.GetType() but adds a magic suffix +// ("_count", "_sum", "_bucket") if needed according to the current parser +// state. +func (p *ProtobufParser) getMagicName() string { + t := p.mf.GetType() + if p.state == EntryHistogram || (t != dto.MetricType_HISTOGRAM && t != dto.MetricType_SUMMARY) { + return p.mf.GetName() + } + if p.fieldPos == -2 { + return p.mf.GetName() + "_count" + } + if p.fieldPos == -1 { + return p.mf.GetName() + "_sum" + } + if t == dto.MetricType_HISTOGRAM { + return p.mf.GetName() + "_bucket" + } + return p.mf.GetName() +} + +// getMagicLabel returns if a magic label ("quantile" or "le") is needed and, if +// so, its name and value. It also sets p.fieldsDone if applicable. +func (p *ProtobufParser) getMagicLabel() (bool, string, string) { + if p.state == EntryHistogram || p.fieldPos < 0 { + return false, "", "" + } + switch p.mf.GetType() { + case dto.MetricType_SUMMARY: + qq := p.mf.GetMetric()[p.metricPos].GetSummary().GetQuantile() + q := qq[p.fieldPos] + p.fieldsDone = p.fieldPos == len(qq)-1 + return true, model.QuantileLabel, formatOpenMetricsFloat(q.GetQuantile()) + case dto.MetricType_HISTOGRAM: + bb := p.mf.GetMetric()[p.metricPos].GetHistogram().GetBucket() + if p.fieldPos >= len(bb) { + p.fieldsDone = true + return true, model.BucketLabel, "+Inf" + } + b := bb[p.fieldPos] + p.fieldsDone = math.IsInf(b.GetUpperBound(), +1) + return true, model.BucketLabel, formatOpenMetricsFloat(b.GetUpperBound()) + } + return false, "", "" +} + var errInvalidVarint = errors.New("protobufparse: invalid varint encountered") // readDelimited is essentially doing what the function of the same name in @@ -334,3 +433,29 @@ func readDelimited(b []byte, mf *dto.MetricFamily) (n int, err error) { mf.Reset() return totalLength, mf.Unmarshal(b[varIntLength:totalLength]) } + +// formatOpenMetricsFloat works like the usual Go string formatting of a fleat +// but appends ".0" if the resulting number would otherwise contain neither a +// "." nor an "e". +func formatOpenMetricsFloat(f float64) string { + // A few common cases hardcoded. + switch { + case f == 1: + return "1.0" + case f == 0: + return "0.0" + case f == -1: + return "-1.0" + case math.IsNaN(f): + return "NaN" + case math.IsInf(f, +1): + return "+Inf" + case math.IsInf(f, -1): + return "-Inf" + } + s := fmt.Sprint(f) + if strings.ContainsAny(s, "e.") { + return s + } + return s + ".0" +} diff --git a/pkg/textparse/protobufparse_test.go b/pkg/textparse/protobufparse_test.go index 96beba06c..4b8eb6c6b 100644 --- a/pkg/textparse/protobufparse_test.go +++ b/pkg/textparse/protobufparse_test.go @@ -104,7 +104,7 @@ metric: < name: "dummyID" value: "59727" > - value: -0.0003919818421972943 + value: -0.00039 timestamp: < seconds: 1625851155 nanos: 146848499 @@ -119,11 +119,7 @@ metric: < name: "dummyID" value: "5617" > - value: -0.0002956962622126468 - timestamp: < - seconds: 1625851150 - nanos: 233181498 - > + value: -0.00029 > > sb_schema: 3 @@ -164,44 +160,40 @@ metric: < `, `name: "test_histogram2" -help: "Same histogram as before but now without sparse buckets." +help: "Similar histogram as before but now without sparse buckets." type: HISTOGRAM metric: < histogram: < sample_count: 175 - sample_sum: 0.0008280461746287094 + sample_sum: 0.000828 bucket: < cumulative_count: 2 - upper_bound: -0.0004899999999999998 + upper_bound: -0.00048 > bucket: < cumulative_count: 4 - upper_bound: -0.0003899999999999998 + upper_bound: -0.00038 exemplar: < label: < name: "dummyID" value: "59727" > - value: -0.0003919818421972943 + value: -0.00038 timestamp: < - seconds: 1625851155 + seconds: 1625851153 nanos: 146848499 > > > bucket: < cumulative_count: 16 - upper_bound: -0.0002899999999999998 + upper_bound: 1 exemplar: < label: < name: "dummyID" value: "5617" > - value: -0.0002956962622126468 - timestamp: < - seconds: 1625851150 - nanos: 233181498 - > + value: -0.000295 > > sb_schema: 0 @@ -265,7 +257,7 @@ metric: < unit string comment string shs histogram.SparseHistogram - e *exemplar.Exemplar + e []exemplar.Exemplar }{ { m: "go_build_info", @@ -299,7 +291,9 @@ metric: < lset: labels.FromStrings( "__name__", "go_memstats_alloc_bytes_total", ), - e: &exemplar.Exemplar{Labels: labels.FromStrings("dummyID", "42"), Value: 12, HasTs: true, Ts: 1625851151233}, + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "42"), Value: 12, HasTs: true, Ts: 1625851151233}, + }, }, { m: "something_untyped", @@ -348,6 +342,121 @@ metric: < lset: labels.FromStrings( "__name__", "test_histogram", ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146}, + {Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false}, + }, + }, + { + m: "test_histogram2", + help: "Similar histogram as before but now without sparse buckets.", + }, + { + m: "test_histogram2", + typ: MetricTypeHistogram, + }, + { + m: "test_histogram2_count", + v: 175, + lset: labels.FromStrings( + "__name__", "test_histogram2_count", + ), + }, + { + m: "test_histogram2_sum", + v: 0.000828, + lset: labels.FromStrings( + "__name__", "test_histogram2_sum", + ), + }, + { + m: "test_histogram2_bucket\xffle\xff-0.00048", + v: 2, + lset: labels.FromStrings( + "__name__", "test_histogram2_bucket", + "le", "-0.00048", + ), + }, + { + m: "test_histogram2_bucket\xffle\xff-0.00038", + v: 4, + lset: labels.FromStrings( + "__name__", "test_histogram2_bucket", + "le", "-0.00038", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00038, HasTs: true, Ts: 1625851153146}, + }, + }, + { + m: "test_histogram2_bucket\xffle\xff1.0", + v: 16, + lset: labels.FromStrings( + "__name__", "test_histogram2_bucket", + "le", "1.0", + ), + e: []exemplar.Exemplar{ + {Labels: labels.FromStrings("dummyID", "5617"), Value: -0.000295, HasTs: false}, + }, + }, + { + m: "test_histogram2_bucket\xffle\xff+Inf", + v: 175, + lset: labels.FromStrings( + "__name__", "test_histogram2_bucket", + "le", "+Inf", + ), + }, + { + m: "rpc_durations_seconds", + help: "RPC latency distributions.", + }, + { + m: "rpc_durations_seconds", + typ: MetricTypeSummary, + }, + { + m: "rpc_durations_seconds_count\xffservice\xffexponential", + v: 262, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds_count", + "service", "exponential", + ), + }, + { + m: "rpc_durations_seconds_sum\xffservice\xffexponential", + v: 0.00025551262820703587, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds_sum", + "service", "exponential", + ), + }, + { + m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.5", + v: 6.442786329648548e-07, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds", + "quantile", "0.5", + "service", "exponential", + ), + }, + { + m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.9", + v: 1.9435742936658396e-06, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds", + "quantile", "0.9", + "service", "exponential", + ), + }, + { + m: "rpc_durations_seconds\xffservice\xffexponential\xffquantile\xff0.99", + v: 4.0471608667037015e-06, + lset: labels.FromStrings( + "__name__", "rpc_durations_seconds", + "quantile", "0.99", + "service", "exponential", + ), }, } @@ -378,11 +487,11 @@ metric: < } require.Equal(t, exp[i].v, v) require.Equal(t, exp[i].lset, res) - if exp[i].e == nil { + if len(exp[i].e) == 0 { require.Equal(t, false, found) } else { require.Equal(t, true, found) - require.Equal(t, *exp[i].e, e) + require.Equal(t, exp[i].e[0], e) } res = res[:0] @@ -400,6 +509,12 @@ metric: < res = res[:0] require.Equal(t, exp[i].m, string(m)) require.Equal(t, exp[i].shs, shs) + j := 0 + for e := (exemplar.Exemplar{}); p.Exemplar(&e); j++ { + require.Equal(t, exp[i].e[j], e) + e = exemplar.Exemplar{} + } + require.Equal(t, len(exp[i].e), j, "not enough exemplars found") case EntryType: m, typ := p.Type()