[PERF] textparse: further optimzations for OM `CreatedTimestamps` (#15097)

* feat: Added more tests; some changes/optimizations when pair-programming with Bartek.

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* chore: imports

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* chore: gofumpt

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* feat: use an efficient replacement to p.Metric

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* feat: reduce mem allocs + comments

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* chore: gofumpt

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* chore: use single quotes

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: Manik Rana <Manikrana54@gmail.com>

* refac: rename

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: Manik Rana <Manikrana54@gmail.com>

* refac: rename to seriesHash

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* refac: switch condition order

Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
Signed-off-by: Manik Rana <Manikrana54@gmail.com>

* refac: switch condition order

Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
Signed-off-by: Manik Rana <Manikrana54@gmail.com>

* feat: stronger checking

Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
Signed-off-by: Manik Rana <Manikrana54@gmail.com>

* chore: fmt

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* refac: pass pointer of buf into seriesHash()

Signed-off-by: Manik Rana <manikrana54@gmail.com>

---------

Signed-off-by: Manik Rana <manikrana54@gmail.com>
Signed-off-by: Manik Rana <Manikrana54@gmail.com>
Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
pull/15315/head
Manik Rana 2024-10-10 16:31:13 +05:30 committed by GitHub
parent 8b545bab2f
commit 032ca9ef96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 165 additions and 82 deletions

View File

@ -17,6 +17,7 @@
package textparse package textparse
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -24,6 +25,7 @@ import (
"strings" "strings"
"unicode/utf8" "unicode/utf8"
"github.com/cespare/xxhash/v2"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/exemplar"
@ -75,6 +77,7 @@ type OpenMetricsParser struct {
l *openMetricsLexer l *openMetricsLexer
builder labels.ScratchBuilder builder labels.ScratchBuilder
series []byte series []byte
mfNameLen int // length of metric family name to get from series.
text []byte text []byte
mtype model.MetricType mtype model.MetricType
val float64 val float64
@ -98,9 +101,9 @@ type OpenMetricsParser struct {
// Created timestamp parsing state. // Created timestamp parsing state.
ct int64 ct int64
ctHashSet uint64 ctHashSet uint64
// visitedName is the metric name of the last visited metric when peeking ahead // visitedMFName is the metric family name of the last visited metric when peeking ahead
// for _created series during the execution of the CreatedTimestamp method. // for _created series during the execution of the CreatedTimestamp method.
visitedName string visitedMFName []byte
skipCTSeries bool skipCTSeries bool
} }
@ -260,25 +263,24 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool {
func (p *OpenMetricsParser) CreatedTimestamp() *int64 { func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
if !typeRequiresCT(p.mtype) { if !typeRequiresCT(p.mtype) {
// Not a CT supported metric type, fast path. // Not a CT supported metric type, fast path.
p.ct = 0 p.ctHashSet = 0 // Use ctHashSet as a single way of telling "empty cache"
p.visitedName = ""
p.ctHashSet = 0
return nil return nil
} }
var ( var (
currLset labels.Labels
buf []byte buf []byte
peekWithoutNameLsetHash uint64 currName []byte
) )
p.Metric(&currLset) if len(p.series) > 1 && p.series[0] == '{' && p.series[1] == '"' {
currFamilyLsetHash, buf := currLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile") // special case for UTF-8 encoded metric family names.
currName := currLset.Get(model.MetricNameLabel) currName = p.series[p.offsets[0]-p.start : p.mfNameLen+2]
currName = findBaseMetricName(currName) } else {
currName = p.series[p.offsets[0]-p.start : p.mfNameLen]
}
// make sure we're on a new metric before returning currHash := p.seriesHash(&buf, currName)
if currName == p.visitedName && currFamilyLsetHash == p.ctHashSet && p.visitedName != "" && p.ctHashSet > 0 && p.ct > 0 { // Check cache, perhaps we fetched something already.
// CT is already known, fast path. if currHash == p.ctHashSet && p.ct > 0 {
return &p.ct return &p.ct
} }
@ -309,17 +311,15 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
return nil return nil
} }
var peekedLset labels.Labels peekedName := p.series[p.offsets[0]-p.start : p.offsets[1]-p.start]
p.Metric(&peekedLset) if len(peekedName) < 8 || string(peekedName[len(peekedName)-8:]) != "_created" {
peekedName := peekedLset.Get(model.MetricNameLabel)
if !strings.HasSuffix(peekedName, "_created") {
// Not a CT line, search more. // Not a CT line, search more.
continue continue
} }
// We got a CT line here, but let's search if CT line is actually for our series, edge case. // Remove _created suffix.
peekWithoutNameLsetHash, _ = peekedLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile") peekedHash := p.seriesHash(&buf, peekedName[:len(peekedName)-8])
if peekWithoutNameLsetHash != currFamilyLsetHash { if peekedHash != currHash {
// Found CT line for a different series, for our series no CT. // Found CT line for a different series, for our series no CT.
p.resetCTParseValues(resetLexer) p.resetCTParseValues(resetLexer)
return nil return nil
@ -328,44 +328,63 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
// All timestamps in OpenMetrics are Unix Epoch in seconds. Convert to milliseconds. // All timestamps in OpenMetrics are Unix Epoch in seconds. Convert to milliseconds.
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#timestamps // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#timestamps
ct := int64(p.val * 1000.0) ct := int64(p.val * 1000.0)
p.setCTParseValues(ct, currFamilyLsetHash, currName, true, resetLexer) p.setCTParseValues(ct, currHash, currName, true, resetLexer)
return &ct return &ct
} }
} }
var (
leBytes = []byte{108, 101}
quantileBytes = []byte{113, 117, 97, 110, 116, 105, 108, 101}
)
// seriesHash generates a hash based on the metric family name and the offsets
// of label names and values from the parsed OpenMetrics data. It skips quantile
// and le labels for summaries and histograms respectively.
func (p *OpenMetricsParser) seriesHash(offsetsArr *[]byte, metricFamilyName []byte) uint64 {
// Iterate through p.offsets to find the label names and values.
for i := 2; i < len(p.offsets); i += 4 {
lStart := p.offsets[i] - p.start
lEnd := p.offsets[i+1] - p.start
label := p.series[lStart:lEnd]
// Skip quantile and le labels for summaries and histograms.
if p.mtype == model.MetricTypeSummary && bytes.Equal(label, quantileBytes) {
continue
}
if p.mtype == model.MetricTypeHistogram && bytes.Equal(label, leBytes) {
continue
}
*offsetsArr = append(*offsetsArr, p.series[lStart:lEnd]...)
vStart := p.offsets[i+2] - p.start
vEnd := p.offsets[i+3] - p.start
*offsetsArr = append(*offsetsArr, p.series[vStart:vEnd]...)
}
*offsetsArr = append(*offsetsArr, metricFamilyName...)
hashedOffsets := xxhash.Sum64(*offsetsArr)
// Reset the offsets array for later reuse.
*offsetsArr = (*offsetsArr)[:0]
return hashedOffsets
}
// setCTParseValues sets the parser to the state after CreatedTimestamp method was called and CT was found. // setCTParseValues sets the parser to the state after CreatedTimestamp method was called and CT was found.
// This is useful to prevent re-parsing the same series again and early return the CT value. // This is useful to prevent re-parsing the same series again and early return the CT value.
func (p *OpenMetricsParser) setCTParseValues(ct int64, ctHashSet uint64, visitedName string, skipCTSeries bool, resetLexer *openMetricsLexer) { func (p *OpenMetricsParser) setCTParseValues(ct int64, ctHashSet uint64, mfName []byte, skipCTSeries bool, resetLexer *openMetricsLexer) {
p.ct = ct p.ct = ct
p.l = resetLexer p.l = resetLexer
p.ctHashSet = ctHashSet p.ctHashSet = ctHashSet
p.visitedName = visitedName p.visitedMFName = mfName
p.skipCTSeries = skipCTSeries p.skipCTSeries = skipCTSeries // Do we need to set it?
} }
// resetCtParseValues resets the parser to the state before CreatedTimestamp method was called. // resetCtParseValues resets the parser to the state before CreatedTimestamp method was called.
func (p *OpenMetricsParser) resetCTParseValues(resetLexer *openMetricsLexer) { func (p *OpenMetricsParser) resetCTParseValues(resetLexer *openMetricsLexer) {
p.l = resetLexer p.l = resetLexer
p.ct = 0
p.ctHashSet = 0 p.ctHashSet = 0
p.visitedName = ""
p.skipCTSeries = true p.skipCTSeries = true
} }
// findBaseMetricName returns the metric name without reserved suffixes such as "_created",
// "_sum", etc. based on the OpenMetrics specification found at
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md.
// If no suffix is found, the original name is returned.
func findBaseMetricName(name string) string {
suffixes := []string{"_created", "_count", "_sum", "_bucket", "_total", "_gcount", "_gsum", "_info"}
for _, suffix := range suffixes {
if strings.HasSuffix(name, suffix) {
return strings.TrimSuffix(name, suffix)
}
}
return name
}
// typeRequiresCT returns true if the metric type requires a _created timestamp. // typeRequiresCT returns true if the metric type requires a _created timestamp.
func typeRequiresCT(t model.MetricType) bool { func typeRequiresCT(t model.MetricType) bool {
switch t { switch t {
@ -419,6 +438,7 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
mStart++ mStart++
mEnd-- mEnd--
} }
p.mfNameLen = mEnd - mStart
p.offsets = append(p.offsets, mStart, mEnd) p.offsets = append(p.offsets, mStart, mEnd)
default: default:
return EntryInvalid, p.parseError("expected metric name after "+t.String(), t2) return EntryInvalid, p.parseError("expected metric name after "+t.String(), t2)

View File

@ -14,6 +14,7 @@
package textparse package textparse
import ( import (
"fmt"
"io" "io"
"testing" "testing"
@ -71,6 +72,8 @@ foo_total 17.0 1520879607.789 # {id="counter-test"} 5
foo_created 1520872607.123 foo_created 1520872607.123
foo_total{a="b"} 17.0 1520879607.789 # {id="counter-test"} 5 foo_total{a="b"} 17.0 1520879607.789 # {id="counter-test"} 5
foo_created{a="b"} 1520872607.123 foo_created{a="b"} 1520872607.123
foo_total{le="c"} 21.0
foo_created{le="c"} 1520872621.123
# HELP bar Summary with CT at the end, making sure we find CT even if it's multiple lines a far # HELP bar Summary with CT at the end, making sure we find CT even if it's multiple lines a far
# TYPE bar summary # TYPE bar summary
bar_count 17.0 bar_count 17.0
@ -294,6 +297,11 @@ foobar{quantile="0.99"} 150.1`
{Labels: labels.FromStrings("id", "counter-test"), Value: 5}, {Labels: labels.FromStrings("id", "counter-test"), Value: 5},
}, },
ct: int64p(1520872607123), ct: int64p(1520872607123),
}, {
m: `foo_total{le="c"}`,
v: 21.0,
lset: labels.FromStrings("__name__", "foo_total", "le", "c"),
ct: int64p(1520872621123),
}, { }, {
m: "bar", m: "bar",
help: "Summary with CT at the end, making sure we find CT even if it's multiple lines a far", help: "Summary with CT at the end, making sure we find CT even if it's multiple lines a far",
@ -820,7 +828,7 @@ func TestOpenMetricsParseErrors(t *testing.T) {
for err == nil { for err == nil {
_, err = p.Next() _, err = p.Next()
} }
require.EqualError(t, err, c.err, "test %d: %s", i, c.input) require.Equal(t, c.err, err.Error(), "test %d: %s", i, c.input)
} }
} }
@ -899,42 +907,97 @@ func TestOMNullByteHandling(t *testing.T) {
// current OM spec limitations or clients with broken OM format. // current OM spec limitations or clients with broken OM format.
// TODO(maniktherana): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this. // TODO(maniktherana): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this.
func TestCTParseFailures(t *testing.T) { func TestCTParseFailures(t *testing.T) {
input := `# HELP thing Histogram with _created as first line for _, tcase := range []struct {
name string
input string
expected []parsedEntry
}{
{
name: "_created line is a first one",
input: `# HELP thing histogram with _created as first line
# TYPE thing histogram # TYPE thing histogram
thing_created 1520872607.123 thing_created 1520872607.123
thing_count 17 thing_count 17
thing_sum 324789.3 thing_sum 324789.3
thing_bucket{le="0.0"} 0 thing_bucket{le="0.0"} 0
thing_bucket{le="+Inf"} 17` thing_bucket{le="+Inf"} 17
# HELP thing_c counter with _created as first line
input += "\n# EOF\n" # TYPE thing_c counter
thing_c_created 1520872607.123
exp := []parsedEntry{ thing_c_total 14123.232
# EOF
`,
expected: []parsedEntry{
{ {
m: "thing", m: "thing",
help: "Histogram with _created as first line", help: "histogram with _created as first line",
}, { },
{
m: "thing", m: "thing",
typ: model.MetricTypeHistogram, typ: model.MetricTypeHistogram,
}, { },
{
m: `thing_count`, m: `thing_count`,
ct: nil, // Should be int64p(1520872607123). ct: nil, // Should be int64p(1520872607123).
}, { },
{
m: `thing_sum`, m: `thing_sum`,
ct: nil, // Should be int64p(1520872607123). ct: nil, // Should be int64p(1520872607123).
}, { },
{
m: `thing_bucket{le="0.0"}`, m: `thing_bucket{le="0.0"}`,
ct: nil, // Should be int64p(1520872607123). ct: nil, // Should be int64p(1520872607123).
}, { },
{
m: `thing_bucket{le="+Inf"}`, m: `thing_bucket{le="+Inf"}`,
ct: nil, // Should be int64p(1520872607123), ct: nil, // Should be int64p(1520872607123),
}, },
} {
m: "thing_c",
p := NewOpenMetricsParser([]byte(input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped()) help: "counter with _created as first line",
},
{
m: "thing_c",
typ: model.MetricTypeCounter,
},
{
m: `thing_c_total`,
ct: nil, // Should be int64p(1520872607123).
},
},
},
{
// TODO(bwplotka): Kind of correct bevaviour? If yes, let's move to the OK tests above.
name: "maybe counter with no meta",
input: `foo_total 17.0
foo_created 1520872607.123
foo_total{a="b"} 17.0
foo_created{a="b"} 1520872608.123
# EOF
`,
expected: []parsedEntry{
{
m: `foo_total`,
},
{
m: `foo_created`,
},
{
m: `foo_total{a="b"}`,
},
{
m: `foo_created{a="b"}`,
},
},
},
} {
t.Run(fmt.Sprintf("case=%v", tcase.name), func(t *testing.T) {
p := NewOpenMetricsParser([]byte(tcase.input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped())
got := testParse(t, p) got := testParse(t, p)
resetValAndLset(got) // Keep this test focused on metric, basic entries and CT only. resetValAndLset(got) // Keep this test focused on metric, basic entries and CT only.
requireEntries(t, exp, got) requireEntries(t, tcase.expected, got)
})
}
} }
func resetValAndLset(e []parsedEntry) { func resetValAndLset(e []parsedEntry) {