From 47aeca96630344464a139b0963dcc85ff463f84d Mon Sep 17 00:00:00 2001 From: Manik Rana Date: Fri, 4 Oct 2024 17:41:02 +0530 Subject: [PATCH] feat: naive fixes and optimzations for `CreatedTimestamp` function (#14965) * enhance: wip ct parse optimizations Signed-off-by: Manik Rana * feat: further work on optimization Signed-off-by: Manik Rana * feat: further improvements and remove unused code Signed-off-by: Manik Rana * feat: improve optimizations and fix some CT parse errors Signed-off-by: Manik Rana * fix: check for LsetHash along with name Signed-off-by: Manik Rana * chore: cleanup and documentation Signed-off-by: Manik Rana * enhance: improve comments and add cleaner functions Signed-off-by: Manik Rana * feat: improve comments and add cleaner functions Signed-off-by: Manik Rana * chore: rename to resetCTParseValues Signed-off-by: Manik Rana * fix: post-merge fixes Signed-off-by: Manik Rana * fix: add all possible reserved suffixes Signed-off-by: Manik Rana * test: separate CT values for each metric Signed-off-by: Manik Rana --------- Signed-off-by: Manik Rana Signed-off-by: Manik Rana --- model/textparse/openmetricsparse.go | 106 +++++++--- model/textparse/openmetricsparse_test.go | 259 ++++++++++------------- 2 files changed, 184 insertions(+), 181 deletions(-) diff --git a/model/textparse/openmetricsparse.go b/model/textparse/openmetricsparse.go index 8ec1b62ff..0e82dc9f5 100644 --- a/model/textparse/openmetricsparse.go +++ b/model/textparse/openmetricsparse.go @@ -95,6 +95,12 @@ type OpenMetricsParser struct { exemplarTs int64 hasExemplarTs bool + // Created timestamp parsing state. + ct int64 + ctHashSet uint64 + // visitedName is the metric name of the last visited metric when peeking ahead + // for _created series during the execution of the CreatedTimestamp method. + visitedName string skipCTSeries bool } @@ -254,6 +260,9 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool { func (p *OpenMetricsParser) CreatedTimestamp() *int64 { if !typeRequiresCT(p.mtype) { // Not a CT supported metric type, fast path. + p.ct = 0 + p.visitedName = "" + p.ctHashSet = 0 return nil } @@ -264,27 +273,44 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 { ) p.Metric(&currLset) currFamilyLsetHash, buf := currLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile") - // Search for the _created line for the currFamilyLsetHash using ephemeral parser until - // we see EOF or new metric family. We have to do it as we don't know where (and if) - // that CT line is. - // TODO(bwplotka): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this. - peek := deepCopy(p) + currName := currLset.Get(model.MetricNameLabel) + currName = findBaseMetricName(currName) + + // make sure we're on a new metric before returning + if currName == p.visitedName && currFamilyLsetHash == p.ctHashSet && p.visitedName != "" && p.ctHashSet > 0 && p.ct > 0 { + // CT is already known, fast path. + return &p.ct + } + + // Create a new lexer to reset the parser once this function is done executing. + resetLexer := &openMetricsLexer{ + b: p.l.b, + i: p.l.i, + start: p.l.start, + err: p.l.err, + state: p.l.state, + } + + p.skipCTSeries = false + for { - eType, err := peek.Next() + eType, err := p.Next() if err != nil { - // This means peek will give error too later on, so def no CT line found. + // This means p.Next() will give error too later on, so def no CT line found. // This might result in partial scrape with wrong/missing CT, but only // spec improvement would help. - // TODO(bwplotka): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this. + // TODO: Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this. + p.resetCTParseValues(resetLexer) return nil } if eType != EntrySeries { // Assume we hit different family, no CT line found. + p.resetCTParseValues(resetLexer) return nil } var peekedLset labels.Labels - peek.Metric(&peekedLset) + p.Metric(&peekedLset) peekedName := peekedLset.Get(model.MetricNameLabel) if !strings.HasSuffix(peekedName, "_created") { // Not a CT line, search more. @@ -294,17 +320,52 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 { // We got a CT line here, but let's search if CT line is actually for our series, edge case. peekWithoutNameLsetHash, _ = peekedLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile") if peekWithoutNameLsetHash != currFamilyLsetHash { - // CT line for a different series, for our series no CT. + // Found CT line for a different series, for our series no CT. + p.resetCTParseValues(resetLexer) return nil } // All timestamps in OpenMetrics are Unix Epoch in seconds. Convert to milliseconds. // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#timestamps - ct := int64(peek.val * 1000.0) + ct := int64(p.val * 1000.0) + p.setCTParseValues(ct, currFamilyLsetHash, currName, true, resetLexer) return &ct } } +// setCTParseValues sets the parser to the state after CreatedTimestamp method was called and CT was found. +// This is useful to prevent re-parsing the same series again and early return the CT value. +func (p *OpenMetricsParser) setCTParseValues(ct int64, ctHashSet uint64, visitedName string, skipCTSeries bool, resetLexer *openMetricsLexer) { + p.ct = ct + p.l = resetLexer + p.ctHashSet = ctHashSet + p.visitedName = visitedName + p.skipCTSeries = skipCTSeries +} + +// resetCtParseValues resets the parser to the state before CreatedTimestamp method was called. +func (p *OpenMetricsParser) resetCTParseValues(resetLexer *openMetricsLexer) { + p.l = resetLexer + p.ct = 0 + p.ctHashSet = 0 + p.visitedName = "" + p.skipCTSeries = true +} + +// findBaseMetricName returns the metric name without reserved suffixes such as "_created", +// "_sum", etc. based on the OpenMetrics specification found at +// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md. +// If no suffix is found, the original name is returned. +func findBaseMetricName(name string) string { + suffixes := []string{"_created", "_count", "_sum", "_bucket", "_total", "_gcount", "_gsum", "_info"} + for _, suffix := range suffixes { + if strings.HasSuffix(name, suffix) { + return strings.TrimSuffix(name, suffix) + } + } + return name +} + // typeRequiresCT returns true if the metric type requires a _created timestamp. func typeRequiresCT(t model.MetricType) bool { switch t { @@ -315,29 +376,6 @@ func typeRequiresCT(t model.MetricType) bool { } } -// deepCopy creates a copy of a parser without re-using the slices' original memory addresses. -func deepCopy(p *OpenMetricsParser) OpenMetricsParser { - newB := make([]byte, len(p.l.b)) - copy(newB, p.l.b) - - newLexer := &openMetricsLexer{ - b: newB, - i: p.l.i, - start: p.l.start, - err: p.l.err, - state: p.l.state, - } - - newParser := OpenMetricsParser{ - l: newLexer, - builder: p.builder, - mtype: p.mtype, - val: p.val, - skipCTSeries: false, - } - return newParser -} - // nextToken returns the next token from the openMetricsLexer. func (p *OpenMetricsParser) nextToken() token { tok := p.l.Lex() diff --git a/model/textparse/openmetricsparse_test.go b/model/textparse/openmetricsparse_test.go index 93033380b..bbb7c0730 100644 --- a/model/textparse/openmetricsparse_test.go +++ b/model/textparse/openmetricsparse_test.go @@ -79,17 +79,38 @@ bar_count 17.0 bar_sum 324789.3 bar{quantile="0.95"} 123.7 bar{quantile="0.99"} 150.0 -bar_created 1520872607.123 +bar_created 1520872608.124 # HELP baz Histogram with the same objective as above's summary # TYPE baz histogram baz_bucket{le="0.0"} 0 baz_bucket{le="+Inf"} 17 baz_count 17 baz_sum 324789.3 -baz_created 1520872607.123 +baz_created 1520872609.125 # HELP fizz_created Gauge which shouldn't be parsed as CT # TYPE fizz_created gauge -fizz_created 17.0` +fizz_created 17.0 +# HELP something Histogram with _created between buckets and summary +# TYPE something histogram +something_count 18 +something_sum 324789.4 +something_created 1520430001 +something_bucket{le="0.0"} 1 +something_bucket{le="+Inf"} 18 +# HELP yum Summary with _created between sum and quantiles +# TYPE yum summary +yum_count 20 +yum_sum 324789.5 +yum_created 1520430003 +yum{quantile="0.95"} 123.7 +yum{quantile="0.99"} 150.0 +# HELP foobar Summary with _created as the first line +# TYPE foobar summary +foobar_count 21 +foobar_created 1520430004 +foobar_sum 324789.6 +foobar{quantile="0.95"} 123.8 +foobar{quantile="0.99"} 150.1` input += "\n# HELP metric foo\x00bar" input += "\nnull_byte_metric{a=\"abc\x00\"} 1" @@ -269,22 +290,22 @@ fizz_created 17.0` m: "bar_count", v: 17.0, lset: labels.FromStrings("__name__", "bar_count"), - ct: int64p(1520872607123), + ct: int64p(1520872608124), }, { m: "bar_sum", v: 324789.3, lset: labels.FromStrings("__name__", "bar_sum"), - ct: int64p(1520872607123), + ct: int64p(1520872608124), }, { m: `bar{quantile="0.95"}`, v: 123.7, lset: labels.FromStrings("__name__", "bar", "quantile", "0.95"), - ct: int64p(1520872607123), + ct: int64p(1520872608124), }, { m: `bar{quantile="0.99"}`, v: 150.0, lset: labels.FromStrings("__name__", "bar", "quantile", "0.99"), - ct: int64p(1520872607123), + ct: int64p(1520872608124), }, { m: "baz", help: "Histogram with the same objective as above's summary", @@ -295,22 +316,22 @@ fizz_created 17.0` m: `baz_bucket{le="0.0"}`, v: 0, lset: labels.FromStrings("__name__", "baz_bucket", "le", "0.0"), - ct: int64p(1520872607123), + ct: int64p(1520872609125), }, { m: `baz_bucket{le="+Inf"}`, v: 17, lset: labels.FromStrings("__name__", "baz_bucket", "le", "+Inf"), - ct: int64p(1520872607123), + ct: int64p(1520872609125), }, { m: `baz_count`, v: 17, lset: labels.FromStrings("__name__", "baz_count"), - ct: int64p(1520872607123), + ct: int64p(1520872609125), }, { m: `baz_sum`, v: 324789.3, lset: labels.FromStrings("__name__", "baz_sum"), - ct: int64p(1520872607123), + ct: int64p(1520872609125), }, { m: "fizz_created", help: "Gauge which shouldn't be parsed as CT", @@ -321,6 +342,84 @@ fizz_created 17.0` m: `fizz_created`, v: 17, lset: labels.FromStrings("__name__", "fizz_created"), + }, { + m: "something", + help: "Histogram with _created between buckets and summary", + }, { + m: "something", + typ: model.MetricTypeHistogram, + }, { + m: `something_count`, + v: 18, + lset: labels.FromStrings("__name__", "something_count"), + ct: int64p(1520430001000), + }, { + m: `something_sum`, + v: 324789.4, + lset: labels.FromStrings("__name__", "something_sum"), + ct: int64p(1520430001000), + }, { + m: `something_bucket{le="0.0"}`, + v: 1, + lset: labels.FromStrings("__name__", "something_bucket", "le", "0.0"), + ct: int64p(1520430001000), + }, { + m: `something_bucket{le="+Inf"}`, + v: 18, + lset: labels.FromStrings("__name__", "something_bucket", "le", "+Inf"), + ct: int64p(1520430001000), + }, { + m: "yum", + help: "Summary with _created between sum and quantiles", + }, { + m: "yum", + typ: model.MetricTypeSummary, + }, { + m: `yum_count`, + v: 20, + lset: labels.FromStrings("__name__", "yum_count"), + ct: int64p(1520430003000), + }, { + m: `yum_sum`, + v: 324789.5, + lset: labels.FromStrings("__name__", "yum_sum"), + ct: int64p(1520430003000), + }, { + m: `yum{quantile="0.95"}`, + v: 123.7, + lset: labels.FromStrings("__name__", "yum", "quantile", "0.95"), + ct: int64p(1520430003000), + }, { + m: `yum{quantile="0.99"}`, + v: 150.0, + lset: labels.FromStrings("__name__", "yum", "quantile", "0.99"), + ct: int64p(1520430003000), + }, { + m: "foobar", + help: "Summary with _created as the first line", + }, { + m: "foobar", + typ: model.MetricTypeSummary, + }, { + m: `foobar_count`, + v: 21, + lset: labels.FromStrings("__name__", "foobar_count"), + ct: int64p(1520430004000), + }, { + m: `foobar_sum`, + v: 324789.6, + lset: labels.FromStrings("__name__", "foobar_sum"), + ct: int64p(1520430004000), + }, { + m: `foobar{quantile="0.95"}`, + v: 123.8, + lset: labels.FromStrings("__name__", "foobar", "quantile", "0.95"), + ct: int64p(1520430004000), + }, { + m: `foobar{quantile="0.99"}`, + v: 150.1, + lset: labels.FromStrings("__name__", "foobar", "quantile", "0.99"), + ct: int64p(1520430004000), }, { m: "metric", help: "foo\x00bar", @@ -784,34 +883,13 @@ func TestOMNullByteHandling(t *testing.T) { // these tests show them. // TODO(maniktherana): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this. func TestCTParseFailures(t *testing.T) { - input := `# HELP something Histogram with _created between buckets and summary -# TYPE something histogram -something_count 17 -something_sum 324789.3 -something_created 1520872607.123 -something_bucket{le="0.0"} 0 -something_bucket{le="+Inf"} 17 -# HELP thing Histogram with _created as first line + input := `# HELP thing Histogram with _created as first line # TYPE thing histogram thing_created 1520872607.123 thing_count 17 thing_sum 324789.3 thing_bucket{le="0.0"} 0 -thing_bucket{le="+Inf"} 17 -# HELP yum Summary with _created between sum and quantiles -# TYPE yum summary -yum_count 17.0 -yum_sum 324789.3 -yum_created 1520872607.123 -yum{quantile="0.95"} 123.7 -yum{quantile="0.99"} 150.0 -# HELP foobar Summary with _created as the first line -# TYPE foobar summary -foobar_created 1520872607.123 -foobar_count 17.0 -foobar_sum 324789.3 -foobar{quantile="0.95"} 123.7 -foobar{quantile="0.99"} 150.0` +thing_bucket{le="+Inf"} 17` input += "\n# EOF\n" @@ -827,30 +905,6 @@ foobar{quantile="0.99"} 150.0` exp := []expectCT{ { - m: "something", - help: "Histogram with _created between buckets and summary", - isErr: false, - }, { - m: "something", - typ: model.MetricTypeHistogram, - isErr: false, - }, { - m: `something_count`, - ct: int64p(1520872607123), - isErr: false, - }, { - m: `something_sum`, - ct: int64p(1520872607123), - isErr: false, - }, { - m: `something_bucket{le="0.0"}`, - ct: int64p(1520872607123), - isErr: true, - }, { - m: `something_bucket{le="+Inf"}`, - ct: int64p(1520872607123), - isErr: true, - }, { m: "thing", help: "Histogram with _created as first line", isErr: false, @@ -874,54 +928,6 @@ foobar{quantile="0.99"} 150.0` m: `thing_bucket{le="+Inf"}`, ct: int64p(1520872607123), isErr: true, - }, { - m: "yum", - help: "Summary with _created between summary and quantiles", - isErr: false, - }, { - m: "yum", - typ: model.MetricTypeSummary, - isErr: false, - }, { - m: "yum_count", - ct: int64p(1520872607123), - isErr: false, - }, { - m: "yum_sum", - ct: int64p(1520872607123), - isErr: false, - }, { - m: `yum{quantile="0.95"}`, - ct: int64p(1520872607123), - isErr: true, - }, { - m: `yum{quantile="0.99"}`, - ct: int64p(1520872607123), - isErr: true, - }, { - m: "foobar", - help: "Summary with _created as the first line", - isErr: false, - }, { - m: "foobar", - typ: model.MetricTypeSummary, - isErr: false, - }, { - m: "foobar_count", - ct: int64p(1520430004), - isErr: true, - }, { - m: "foobar_sum", - ct: int64p(1520430004), - isErr: true, - }, { - m: `foobar{quantile="0.95"}`, - ct: int64p(1520430004), - isErr: true, - }, { - m: `foobar{quantile="0.99"}`, - ct: int64p(1520430004), - isErr: true, }, } @@ -953,47 +959,6 @@ foobar{quantile="0.99"} 150.0` } } -func TestDeepCopy(t *testing.T) { - input := []byte(`# HELP go_goroutines A gauge goroutines. -# TYPE go_goroutines gauge -go_goroutines 33 123.123 -# TYPE go_gc_duration_seconds summary -go_gc_duration_seconds -go_gc_duration_seconds_created`) - - st := labels.NewSymbolTable() - parser := NewOpenMetricsParser(input, st, WithOMParserCTSeriesSkipped()).(*OpenMetricsParser) - - // Modify the original parser state - _, err := parser.Next() - require.NoError(t, err) - require.Equal(t, "go_goroutines", string(parser.l.b[parser.offsets[0]:parser.offsets[1]])) - require.True(t, parser.skipCTSeries) - - // Create a deep copy of the parser - copyParser := deepCopy(parser) - etype, err := copyParser.Next() - require.NoError(t, err) - require.Equal(t, EntryType, etype) - require.True(t, parser.skipCTSeries) - require.False(t, copyParser.skipCTSeries) - - // Modify the original parser further - parser.Next() - parser.Next() - parser.Next() - require.Equal(t, "go_gc_duration_seconds", string(parser.l.b[parser.offsets[0]:parser.offsets[1]])) - require.Equal(t, "summary", string(parser.mtype)) - require.False(t, copyParser.skipCTSeries) - require.True(t, parser.skipCTSeries) - - // Ensure the copy remains unchanged - copyParser.Next() - copyParser.Next() - require.Equal(t, "go_gc_duration_seconds", string(copyParser.l.b[copyParser.offsets[0]:copyParser.offsets[1]])) - require.False(t, copyParser.skipCTSeries) -} - func BenchmarkOMParseCreatedTimestamp(b *testing.B) { for parserName, parser := range map[string]func([]byte, *labels.SymbolTable) Parser{ "openmetrics": func(b []byte, st *labels.SymbolTable) Parser {