Browse Source

feat: naive fixes and optimzations for `CreatedTimestamp` function (#14965)

* enhance: wip ct parse optimizations

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* feat: further work on optimization

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* feat: further improvements and remove unused code

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* feat: improve optimizations and fix some CT parse errors

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* fix: check for LsetHash along with name

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* chore: cleanup and documentation

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* enhance: improve comments and add cleaner functions

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* feat: improve comments and add cleaner functions

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* chore: rename to resetCTParseValues

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* fix: post-merge fixes

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* fix: add all possible reserved suffixes

Signed-off-by: Manik Rana <manikrana54@gmail.com>

* test: separate CT values for each metric

Signed-off-by: Manik Rana <manikrana54@gmail.com>

---------

Signed-off-by: Manik Rana <manikrana54@gmail.com>
Signed-off-by: Manik Rana <Manikrana54@gmail.com>
pull/14750/merge
Manik Rana 2 months ago committed by GitHub
parent
commit
47aeca9663
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 106
      model/textparse/openmetricsparse.go
  2. 259
      model/textparse/openmetricsparse_test.go

106
model/textparse/openmetricsparse.go

@ -95,6 +95,12 @@ type OpenMetricsParser struct {
exemplarTs int64 exemplarTs int64
hasExemplarTs bool hasExemplarTs bool
// Created timestamp parsing state.
ct int64
ctHashSet uint64
// visitedName is the metric name of the last visited metric when peeking ahead
// for _created series during the execution of the CreatedTimestamp method.
visitedName string
skipCTSeries bool skipCTSeries bool
} }
@ -254,6 +260,9 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool {
func (p *OpenMetricsParser) CreatedTimestamp() *int64 { func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
if !typeRequiresCT(p.mtype) { if !typeRequiresCT(p.mtype) {
// Not a CT supported metric type, fast path. // Not a CT supported metric type, fast path.
p.ct = 0
p.visitedName = ""
p.ctHashSet = 0
return nil return nil
} }
@ -264,27 +273,44 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
) )
p.Metric(&currLset) p.Metric(&currLset)
currFamilyLsetHash, buf := currLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile") currFamilyLsetHash, buf := currLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile")
// Search for the _created line for the currFamilyLsetHash using ephemeral parser until currName := currLset.Get(model.MetricNameLabel)
// we see EOF or new metric family. We have to do it as we don't know where (and if) currName = findBaseMetricName(currName)
// that CT line is.
// TODO(bwplotka): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this. // make sure we're on a new metric before returning
peek := deepCopy(p) if currName == p.visitedName && currFamilyLsetHash == p.ctHashSet && p.visitedName != "" && p.ctHashSet > 0 && p.ct > 0 {
// CT is already known, fast path.
return &p.ct
}
// Create a new lexer to reset the parser once this function is done executing.
resetLexer := &openMetricsLexer{
b: p.l.b,
i: p.l.i,
start: p.l.start,
err: p.l.err,
state: p.l.state,
}
p.skipCTSeries = false
for { for {
eType, err := peek.Next() eType, err := p.Next()
if err != nil { if err != nil {
// This means peek will give error too later on, so def no CT line found. // This means p.Next() will give error too later on, so def no CT line found.
// This might result in partial scrape with wrong/missing CT, but only // This might result in partial scrape with wrong/missing CT, but only
// spec improvement would help. // spec improvement would help.
// TODO(bwplotka): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this. // TODO: Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this.
p.resetCTParseValues(resetLexer)
return nil return nil
} }
if eType != EntrySeries { if eType != EntrySeries {
// Assume we hit different family, no CT line found. // Assume we hit different family, no CT line found.
p.resetCTParseValues(resetLexer)
return nil return nil
} }
var peekedLset labels.Labels var peekedLset labels.Labels
peek.Metric(&peekedLset) p.Metric(&peekedLset)
peekedName := peekedLset.Get(model.MetricNameLabel) peekedName := peekedLset.Get(model.MetricNameLabel)
if !strings.HasSuffix(peekedName, "_created") { if !strings.HasSuffix(peekedName, "_created") {
// Not a CT line, search more. // Not a CT line, search more.
@ -294,17 +320,52 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
// We got a CT line here, but let's search if CT line is actually for our series, edge case. // We got a CT line here, but let's search if CT line is actually for our series, edge case.
peekWithoutNameLsetHash, _ = peekedLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile") peekWithoutNameLsetHash, _ = peekedLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile")
if peekWithoutNameLsetHash != currFamilyLsetHash { if peekWithoutNameLsetHash != currFamilyLsetHash {
// CT line for a different series, for our series no CT. // Found CT line for a different series, for our series no CT.
p.resetCTParseValues(resetLexer)
return nil return nil
} }
// All timestamps in OpenMetrics are Unix Epoch in seconds. Convert to milliseconds. // All timestamps in OpenMetrics are Unix Epoch in seconds. Convert to milliseconds.
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#timestamps // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#timestamps
ct := int64(peek.val * 1000.0) ct := int64(p.val * 1000.0)
p.setCTParseValues(ct, currFamilyLsetHash, currName, true, resetLexer)
return &ct return &ct
} }
} }
// setCTParseValues sets the parser to the state after CreatedTimestamp method was called and CT was found.
// This is useful to prevent re-parsing the same series again and early return the CT value.
func (p *OpenMetricsParser) setCTParseValues(ct int64, ctHashSet uint64, visitedName string, skipCTSeries bool, resetLexer *openMetricsLexer) {
p.ct = ct
p.l = resetLexer
p.ctHashSet = ctHashSet
p.visitedName = visitedName
p.skipCTSeries = skipCTSeries
}
// resetCtParseValues resets the parser to the state before CreatedTimestamp method was called.
func (p *OpenMetricsParser) resetCTParseValues(resetLexer *openMetricsLexer) {
p.l = resetLexer
p.ct = 0
p.ctHashSet = 0
p.visitedName = ""
p.skipCTSeries = true
}
// findBaseMetricName returns the metric name without reserved suffixes such as "_created",
// "_sum", etc. based on the OpenMetrics specification found at
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md.
// If no suffix is found, the original name is returned.
func findBaseMetricName(name string) string {
suffixes := []string{"_created", "_count", "_sum", "_bucket", "_total", "_gcount", "_gsum", "_info"}
for _, suffix := range suffixes {
if strings.HasSuffix(name, suffix) {
return strings.TrimSuffix(name, suffix)
}
}
return name
}
// typeRequiresCT returns true if the metric type requires a _created timestamp. // typeRequiresCT returns true if the metric type requires a _created timestamp.
func typeRequiresCT(t model.MetricType) bool { func typeRequiresCT(t model.MetricType) bool {
switch t { switch t {
@ -315,29 +376,6 @@ func typeRequiresCT(t model.MetricType) bool {
} }
} }
// deepCopy creates a copy of a parser without re-using the slices' original memory addresses.
func deepCopy(p *OpenMetricsParser) OpenMetricsParser {
newB := make([]byte, len(p.l.b))
copy(newB, p.l.b)
newLexer := &openMetricsLexer{
b: newB,
i: p.l.i,
start: p.l.start,
err: p.l.err,
state: p.l.state,
}
newParser := OpenMetricsParser{
l: newLexer,
builder: p.builder,
mtype: p.mtype,
val: p.val,
skipCTSeries: false,
}
return newParser
}
// nextToken returns the next token from the openMetricsLexer. // nextToken returns the next token from the openMetricsLexer.
func (p *OpenMetricsParser) nextToken() token { func (p *OpenMetricsParser) nextToken() token {
tok := p.l.Lex() tok := p.l.Lex()

259
model/textparse/openmetricsparse_test.go

@ -79,17 +79,38 @@ bar_count 17.0
bar_sum 324789.3 bar_sum 324789.3
bar{quantile="0.95"} 123.7 bar{quantile="0.95"} 123.7
bar{quantile="0.99"} 150.0 bar{quantile="0.99"} 150.0
bar_created 1520872607.123 bar_created 1520872608.124
# HELP baz Histogram with the same objective as above's summary # HELP baz Histogram with the same objective as above's summary
# TYPE baz histogram # TYPE baz histogram
baz_bucket{le="0.0"} 0 baz_bucket{le="0.0"} 0
baz_bucket{le="+Inf"} 17 baz_bucket{le="+Inf"} 17
baz_count 17 baz_count 17
baz_sum 324789.3 baz_sum 324789.3
baz_created 1520872607.123 baz_created 1520872609.125
# HELP fizz_created Gauge which shouldn't be parsed as CT # HELP fizz_created Gauge which shouldn't be parsed as CT
# TYPE fizz_created gauge # TYPE fizz_created gauge
fizz_created 17.0` fizz_created 17.0
# HELP something Histogram with _created between buckets and summary
# TYPE something histogram
something_count 18
something_sum 324789.4
something_created 1520430001
something_bucket{le="0.0"} 1
something_bucket{le="+Inf"} 18
# HELP yum Summary with _created between sum and quantiles
# TYPE yum summary
yum_count 20
yum_sum 324789.5
yum_created 1520430003
yum{quantile="0.95"} 123.7
yum{quantile="0.99"} 150.0
# HELP foobar Summary with _created as the first line
# TYPE foobar summary
foobar_count 21
foobar_created 1520430004
foobar_sum 324789.6
foobar{quantile="0.95"} 123.8
foobar{quantile="0.99"} 150.1`
input += "\n# HELP metric foo\x00bar" input += "\n# HELP metric foo\x00bar"
input += "\nnull_byte_metric{a=\"abc\x00\"} 1" input += "\nnull_byte_metric{a=\"abc\x00\"} 1"
@ -269,22 +290,22 @@ fizz_created 17.0`
m: "bar_count", m: "bar_count",
v: 17.0, v: 17.0,
lset: labels.FromStrings("__name__", "bar_count"), lset: labels.FromStrings("__name__", "bar_count"),
ct: int64p(1520872607123), ct: int64p(1520872608124),
}, { }, {
m: "bar_sum", m: "bar_sum",
v: 324789.3, v: 324789.3,
lset: labels.FromStrings("__name__", "bar_sum"), lset: labels.FromStrings("__name__", "bar_sum"),
ct: int64p(1520872607123), ct: int64p(1520872608124),
}, { }, {
m: `bar{quantile="0.95"}`, m: `bar{quantile="0.95"}`,
v: 123.7, v: 123.7,
lset: labels.FromStrings("__name__", "bar", "quantile", "0.95"), lset: labels.FromStrings("__name__", "bar", "quantile", "0.95"),
ct: int64p(1520872607123), ct: int64p(1520872608124),
}, { }, {
m: `bar{quantile="0.99"}`, m: `bar{quantile="0.99"}`,
v: 150.0, v: 150.0,
lset: labels.FromStrings("__name__", "bar", "quantile", "0.99"), lset: labels.FromStrings("__name__", "bar", "quantile", "0.99"),
ct: int64p(1520872607123), ct: int64p(1520872608124),
}, { }, {
m: "baz", m: "baz",
help: "Histogram with the same objective as above's summary", help: "Histogram with the same objective as above's summary",
@ -295,22 +316,22 @@ fizz_created 17.0`
m: `baz_bucket{le="0.0"}`, m: `baz_bucket{le="0.0"}`,
v: 0, v: 0,
lset: labels.FromStrings("__name__", "baz_bucket", "le", "0.0"), lset: labels.FromStrings("__name__", "baz_bucket", "le", "0.0"),
ct: int64p(1520872607123), ct: int64p(1520872609125),
}, { }, {
m: `baz_bucket{le="+Inf"}`, m: `baz_bucket{le="+Inf"}`,
v: 17, v: 17,
lset: labels.FromStrings("__name__", "baz_bucket", "le", "+Inf"), lset: labels.FromStrings("__name__", "baz_bucket", "le", "+Inf"),
ct: int64p(1520872607123), ct: int64p(1520872609125),
}, { }, {
m: `baz_count`, m: `baz_count`,
v: 17, v: 17,
lset: labels.FromStrings("__name__", "baz_count"), lset: labels.FromStrings("__name__", "baz_count"),
ct: int64p(1520872607123), ct: int64p(1520872609125),
}, { }, {
m: `baz_sum`, m: `baz_sum`,
v: 324789.3, v: 324789.3,
lset: labels.FromStrings("__name__", "baz_sum"), lset: labels.FromStrings("__name__", "baz_sum"),
ct: int64p(1520872607123), ct: int64p(1520872609125),
}, { }, {
m: "fizz_created", m: "fizz_created",
help: "Gauge which shouldn't be parsed as CT", help: "Gauge which shouldn't be parsed as CT",
@ -321,6 +342,84 @@ fizz_created 17.0`
m: `fizz_created`, m: `fizz_created`,
v: 17, v: 17,
lset: labels.FromStrings("__name__", "fizz_created"), lset: labels.FromStrings("__name__", "fizz_created"),
}, {
m: "something",
help: "Histogram with _created between buckets and summary",
}, {
m: "something",
typ: model.MetricTypeHistogram,
}, {
m: `something_count`,
v: 18,
lset: labels.FromStrings("__name__", "something_count"),
ct: int64p(1520430001000),
}, {
m: `something_sum`,
v: 324789.4,
lset: labels.FromStrings("__name__", "something_sum"),
ct: int64p(1520430001000),
}, {
m: `something_bucket{le="0.0"}`,
v: 1,
lset: labels.FromStrings("__name__", "something_bucket", "le", "0.0"),
ct: int64p(1520430001000),
}, {
m: `something_bucket{le="+Inf"}`,
v: 18,
lset: labels.FromStrings("__name__", "something_bucket", "le", "+Inf"),
ct: int64p(1520430001000),
}, {
m: "yum",
help: "Summary with _created between sum and quantiles",
}, {
m: "yum",
typ: model.MetricTypeSummary,
}, {
m: `yum_count`,
v: 20,
lset: labels.FromStrings("__name__", "yum_count"),
ct: int64p(1520430003000),
}, {
m: `yum_sum`,
v: 324789.5,
lset: labels.FromStrings("__name__", "yum_sum"),
ct: int64p(1520430003000),
}, {
m: `yum{quantile="0.95"}`,
v: 123.7,
lset: labels.FromStrings("__name__", "yum", "quantile", "0.95"),
ct: int64p(1520430003000),
}, {
m: `yum{quantile="0.99"}`,
v: 150.0,
lset: labels.FromStrings("__name__", "yum", "quantile", "0.99"),
ct: int64p(1520430003000),
}, {
m: "foobar",
help: "Summary with _created as the first line",
}, {
m: "foobar",
typ: model.MetricTypeSummary,
}, {
m: `foobar_count`,
v: 21,
lset: labels.FromStrings("__name__", "foobar_count"),
ct: int64p(1520430004000),
}, {
m: `foobar_sum`,
v: 324789.6,
lset: labels.FromStrings("__name__", "foobar_sum"),
ct: int64p(1520430004000),
}, {
m: `foobar{quantile="0.95"}`,
v: 123.8,
lset: labels.FromStrings("__name__", "foobar", "quantile", "0.95"),
ct: int64p(1520430004000),
}, {
m: `foobar{quantile="0.99"}`,
v: 150.1,
lset: labels.FromStrings("__name__", "foobar", "quantile", "0.99"),
ct: int64p(1520430004000),
}, { }, {
m: "metric", m: "metric",
help: "foo\x00bar", help: "foo\x00bar",
@ -784,34 +883,13 @@ func TestOMNullByteHandling(t *testing.T) {
// these tests show them. // these tests show them.
// TODO(maniktherana): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this. // TODO(maniktherana): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this.
func TestCTParseFailures(t *testing.T) { func TestCTParseFailures(t *testing.T) {
input := `# HELP something Histogram with _created between buckets and summary input := `# HELP thing Histogram with _created as first line
# TYPE something histogram
something_count 17
something_sum 324789.3
something_created 1520872607.123
something_bucket{le="0.0"} 0
something_bucket{le="+Inf"} 17
# HELP thing Histogram with _created as first line
# TYPE thing histogram # TYPE thing histogram
thing_created 1520872607.123 thing_created 1520872607.123
thing_count 17 thing_count 17
thing_sum 324789.3 thing_sum 324789.3
thing_bucket{le="0.0"} 0 thing_bucket{le="0.0"} 0
thing_bucket{le="+Inf"} 17 thing_bucket{le="+Inf"} 17`
# HELP yum Summary with _created between sum and quantiles
# TYPE yum summary
yum_count 17.0
yum_sum 324789.3
yum_created 1520872607.123
yum{quantile="0.95"} 123.7
yum{quantile="0.99"} 150.0
# HELP foobar Summary with _created as the first line
# TYPE foobar summary
foobar_created 1520872607.123
foobar_count 17.0
foobar_sum 324789.3
foobar{quantile="0.95"} 123.7
foobar{quantile="0.99"} 150.0`
input += "\n# EOF\n" input += "\n# EOF\n"
@ -827,30 +905,6 @@ foobar{quantile="0.99"} 150.0`
exp := []expectCT{ exp := []expectCT{
{ {
m: "something",
help: "Histogram with _created between buckets and summary",
isErr: false,
}, {
m: "something",
typ: model.MetricTypeHistogram,
isErr: false,
}, {
m: `something_count`,
ct: int64p(1520872607123),
isErr: false,
}, {
m: `something_sum`,
ct: int64p(1520872607123),
isErr: false,
}, {
m: `something_bucket{le="0.0"}`,
ct: int64p(1520872607123),
isErr: true,
}, {
m: `something_bucket{le="+Inf"}`,
ct: int64p(1520872607123),
isErr: true,
}, {
m: "thing", m: "thing",
help: "Histogram with _created as first line", help: "Histogram with _created as first line",
isErr: false, isErr: false,
@ -874,54 +928,6 @@ foobar{quantile="0.99"} 150.0`
m: `thing_bucket{le="+Inf"}`, m: `thing_bucket{le="+Inf"}`,
ct: int64p(1520872607123), ct: int64p(1520872607123),
isErr: true, isErr: true,
}, {
m: "yum",
help: "Summary with _created between summary and quantiles",
isErr: false,
}, {
m: "yum",
typ: model.MetricTypeSummary,
isErr: false,
}, {
m: "yum_count",
ct: int64p(1520872607123),
isErr: false,
}, {
m: "yum_sum",
ct: int64p(1520872607123),
isErr: false,
}, {
m: `yum{quantile="0.95"}`,
ct: int64p(1520872607123),
isErr: true,
}, {
m: `yum{quantile="0.99"}`,
ct: int64p(1520872607123),
isErr: true,
}, {
m: "foobar",
help: "Summary with _created as the first line",
isErr: false,
}, {
m: "foobar",
typ: model.MetricTypeSummary,
isErr: false,
}, {
m: "foobar_count",
ct: int64p(1520430004),
isErr: true,
}, {
m: "foobar_sum",
ct: int64p(1520430004),
isErr: true,
}, {
m: `foobar{quantile="0.95"}`,
ct: int64p(1520430004),
isErr: true,
}, {
m: `foobar{quantile="0.99"}`,
ct: int64p(1520430004),
isErr: true,
}, },
} }
@ -953,47 +959,6 @@ foobar{quantile="0.99"} 150.0`
} }
} }
func TestDeepCopy(t *testing.T) {
input := []byte(`# HELP go_goroutines A gauge goroutines.
# TYPE go_goroutines gauge
go_goroutines 33 123.123
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds
go_gc_duration_seconds_created`)
st := labels.NewSymbolTable()
parser := NewOpenMetricsParser(input, st, WithOMParserCTSeriesSkipped()).(*OpenMetricsParser)
// Modify the original parser state
_, err := parser.Next()
require.NoError(t, err)
require.Equal(t, "go_goroutines", string(parser.l.b[parser.offsets[0]:parser.offsets[1]]))
require.True(t, parser.skipCTSeries)
// Create a deep copy of the parser
copyParser := deepCopy(parser)
etype, err := copyParser.Next()
require.NoError(t, err)
require.Equal(t, EntryType, etype)
require.True(t, parser.skipCTSeries)
require.False(t, copyParser.skipCTSeries)
// Modify the original parser further
parser.Next()
parser.Next()
parser.Next()
require.Equal(t, "go_gc_duration_seconds", string(parser.l.b[parser.offsets[0]:parser.offsets[1]]))
require.Equal(t, "summary", string(parser.mtype))
require.False(t, copyParser.skipCTSeries)
require.True(t, parser.skipCTSeries)
// Ensure the copy remains unchanged
copyParser.Next()
copyParser.Next()
require.Equal(t, "go_gc_duration_seconds", string(copyParser.l.b[copyParser.offsets[0]:copyParser.offsets[1]]))
require.False(t, copyParser.skipCTSeries)
}
func BenchmarkOMParseCreatedTimestamp(b *testing.B) { func BenchmarkOMParseCreatedTimestamp(b *testing.B) {
for parserName, parser := range map[string]func([]byte, *labels.SymbolTable) Parser{ for parserName, parser := range map[string]func([]byte, *labels.SymbolTable) Parser{
"openmetrics": func(b []byte, st *labels.SymbolTable) Parser { "openmetrics": func(b []byte, st *labels.SymbolTable) Parser {

Loading…
Cancel
Save