From f1c57a95ed7df70176934a38799354f692ac8963 Mon Sep 17 00:00:00 2001 From: Manik Rana Date: Wed, 2 Oct 2024 16:22:03 +0530 Subject: [PATCH] change: No longer ingest OM _created as timeseries if feature-flag 'enable-ct-zero-ingestion' is enabled; fixed OM text CT conversion bug (#14738) * chore: revert TypeRequiresCT to private Signed-off-by: Manik Rana * feat: init NewOpenMetricsParser with skipCT true Signed-off-by: Manik Rana * refac: allow opt-in to OM CT ingestion Signed-off-by: Manik Rana * chore: lint Signed-off-by: Manik Rana * chore: use textparse interface to set om options Signed-off-by: Manik Rana * fix: set skipOMSeries in test Signed-off-by: Manik Rana * chore: gofumpt Signed-off-by: Manik Rana * wip: add tests for OM CR parse Signed-off-by: Manik Rana * chore: merge ct tests Signed-off-by: Manik Rana * tests: add cases for OM text Signed-off-by: Manik Rana * fix: check correct test cases Signed-off-by: Manik Rana * chore: use both scrape protocols in config Signed-off-by: Manik Rana * fix: fix inputs and output tests for OM Signed-off-by: Manik Rana * chore: cleanup Signed-off-by: Manik Rana * refac: rename skipOMSeries to skipOMCTSeries Co-authored-by: Arthur Silva Sens Signed-off-by: Manik Rana * fix: finish refac Signed-off-by: Manik Rana * refac: move setup code outside test Signed-off-by: Manik Rana * tests: verify _created lines create new metric in certain cases Signed-off-by: Manik Rana * fix: post merge fixes Signed-off-by: Manik Rana * chore: lint Signed-off-by: Manik Rana * manager: Fixed CT OMText conversion bug; Refactored tests. Signed-off-by: bwplotka * chore: lint Signed-off-by: Manik Rana * chore: gofumpt Signed-off-by: Manik Rana * chore: imports Signed-off-by: Manik Rana --------- Signed-off-by: Manik Rana Signed-off-by: Manik Rana Signed-off-by: bwplotka Co-authored-by: Arthur Silva Sens Co-authored-by: bwplotka --- model/textparse/interface.go | 6 +- model/textparse/interface_test.go | 2 +- model/textparse/openmetricsparse.go | 5 +- model/textparse/openmetricsparse_test.go | 70 +++--- promql/fuzz.go | 2 +- scrape/manager_test.go | 298 ++++++++++++++--------- scrape/scrape.go | 2 +- scrape/scrape_test.go | 2 +- 8 files changed, 224 insertions(+), 163 deletions(-) diff --git a/model/textparse/interface.go b/model/textparse/interface.go index 0b5d9281e..7de88a486 100644 --- a/model/textparse/interface.go +++ b/model/textparse/interface.go @@ -80,7 +80,7 @@ type Parser interface { // // This function always returns a valid parser, but might additionally // return an error if the content type cannot be parsed. -func New(b []byte, contentType string, parseClassicHistograms bool, st *labels.SymbolTable) (Parser, error) { +func New(b []byte, contentType string, parseClassicHistograms, skipOMCTSeries bool, st *labels.SymbolTable) (Parser, error) { if contentType == "" { return NewPromParser(b, st), nil } @@ -91,7 +91,9 @@ func New(b []byte, contentType string, parseClassicHistograms bool, st *labels.S } switch mediaType { case "application/openmetrics-text": - return NewOpenMetricsParser(b, st), nil + return NewOpenMetricsParser(b, st, func(o *openMetricsParserOptions) { + o.SkipCTSeries = skipOMCTSeries + }), nil case "application/vnd.google.protobuf": return NewProtobufParser(b, parseClassicHistograms, st), nil default: diff --git a/model/textparse/interface_test.go b/model/textparse/interface_test.go index c64456562..970b96706 100644 --- a/model/textparse/interface_test.go +++ b/model/textparse/interface_test.go @@ -93,7 +93,7 @@ func TestNewParser(t *testing.T) { tt := tt // Copy to local variable before going parallel. t.Parallel() - p, err := New([]byte{}, tt.contentType, false, labels.NewSymbolTable()) + p, err := New([]byte{}, tt.contentType, false, false, labels.NewSymbolTable()) tt.validateParser(t, p) if tt.err == "" { require.NoError(t, err) diff --git a/model/textparse/openmetricsparse.go b/model/textparse/openmetricsparse.go index ea7607c3a..8ec1b62ff 100644 --- a/model/textparse/openmetricsparse.go +++ b/model/textparse/openmetricsparse.go @@ -297,7 +297,10 @@ func (p *OpenMetricsParser) CreatedTimestamp() *int64 { // CT line for a different series, for our series no CT. return nil } - ct := int64(peek.val) + + // All timestamps in OpenMetrics are Unix Epoch in seconds. Convert to milliseconds. + // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#timestamps + ct := int64(peek.val * 1000.0) return &ct } } diff --git a/model/textparse/openmetricsparse_test.go b/model/textparse/openmetricsparse_test.go index ce1261f5c..93033380b 100644 --- a/model/textparse/openmetricsparse_test.go +++ b/model/textparse/openmetricsparse_test.go @@ -70,23 +70,23 @@ testmetric{label="\"bar\""} 1 # HELP foo Counter with and without labels to certify CT is parsed for both cases # TYPE foo counter foo_total 17.0 1520879607.789 # {id="counter-test"} 5 -foo_created 1000 +foo_created 1520872607.123 foo_total{a="b"} 17.0 1520879607.789 # {id="counter-test"} 5 -foo_created{a="b"} 1000 +foo_created{a="b"} 1520872607.123 # HELP bar Summary with CT at the end, making sure we find CT even if it's multiple lines a far # TYPE bar summary bar_count 17.0 bar_sum 324789.3 bar{quantile="0.95"} 123.7 bar{quantile="0.99"} 150.0 -bar_created 1520430000 +bar_created 1520872607.123 # HELP baz Histogram with the same objective as above's summary # TYPE baz histogram baz_bucket{le="0.0"} 0 baz_bucket{le="+Inf"} 17 baz_count 17 baz_sum 324789.3 -baz_created 1520430000 +baz_created 1520872607.123 # HELP fizz_created Gauge which shouldn't be parsed as CT # TYPE fizz_created gauge fizz_created 17.0` @@ -251,14 +251,14 @@ fizz_created 17.0` lset: labels.FromStrings("__name__", "foo_total"), t: int64p(1520879607789), e: &exemplar.Exemplar{Labels: labels.FromStrings("id", "counter-test"), Value: 5}, - ct: int64p(1000), + ct: int64p(1520872607123), }, { m: `foo_total{a="b"}`, v: 17.0, lset: labels.FromStrings("__name__", "foo_total", "a", "b"), t: int64p(1520879607789), e: &exemplar.Exemplar{Labels: labels.FromStrings("id", "counter-test"), Value: 5}, - ct: int64p(1000), + ct: int64p(1520872607123), }, { m: "bar", help: "Summary with CT at the end, making sure we find CT even if it's multiple lines a far", @@ -269,22 +269,22 @@ fizz_created 17.0` m: "bar_count", v: 17.0, lset: labels.FromStrings("__name__", "bar_count"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: "bar_sum", v: 324789.3, lset: labels.FromStrings("__name__", "bar_sum"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: `bar{quantile="0.95"}`, v: 123.7, lset: labels.FromStrings("__name__", "bar", "quantile", "0.95"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: `bar{quantile="0.99"}`, v: 150.0, lset: labels.FromStrings("__name__", "bar", "quantile", "0.99"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: "baz", help: "Histogram with the same objective as above's summary", @@ -295,22 +295,22 @@ fizz_created 17.0` m: `baz_bucket{le="0.0"}`, v: 0, lset: labels.FromStrings("__name__", "baz_bucket", "le", "0.0"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: `baz_bucket{le="+Inf"}`, v: 17, lset: labels.FromStrings("__name__", "baz_bucket", "le", "+Inf"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: `baz_count`, v: 17, lset: labels.FromStrings("__name__", "baz_count"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: `baz_sum`, v: 324789.3, lset: labels.FromStrings("__name__", "baz_sum"), - ct: int64p(1520430000), + ct: int64p(1520872607123), }, { m: "fizz_created", help: "Gauge which shouldn't be parsed as CT", @@ -347,7 +347,7 @@ func TestUTF8OpenMetricsParse(t *testing.T) { # UNIT "go.gc_duration_seconds" seconds {"go.gc_duration_seconds",quantile="0"} 4.9351e-05 {"go.gc_duration_seconds",quantile="0.25"} 7.424100000000001e-05 -{"go.gc_duration_seconds_created"} 12313 +{"go.gc_duration_seconds_created"} 1520872607.123 {"go.gc_duration_seconds",quantile="0.5",a="b"} 8.3835e-05 {"http.status",q="0.9",a="b"} 8.3835e-05 {"http.status",q="0.9",a="b"} 8.3835e-05 @@ -371,12 +371,12 @@ func TestUTF8OpenMetricsParse(t *testing.T) { m: `{"go.gc_duration_seconds",quantile="0"}`, v: 4.9351e-05, lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0"), - ct: int64p(12313), + ct: int64p(1520872607123), }, { m: `{"go.gc_duration_seconds",quantile="0.25"}`, v: 7.424100000000001e-05, lset: labels.FromStrings("__name__", "go.gc_duration_seconds", "quantile", "0.25"), - ct: int64p(12313), + ct: int64p(1520872607123), }, { m: `{"go.gc_duration_seconds",quantile="0.5",a="b"}`, v: 8.3835e-05, @@ -700,7 +700,7 @@ func TestOpenMetricsParseErrors(t *testing.T) { } for i, c := range cases { - p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable()) + p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped()) var err error for err == nil { _, err = p.Next() @@ -765,7 +765,7 @@ func TestOMNullByteHandling(t *testing.T) { } for i, c := range cases { - p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable()) + p := NewOpenMetricsParser([]byte(c.input), labels.NewSymbolTable(), WithOMParserCTSeriesSkipped()) var err error for err == nil { _, err = p.Next() @@ -788,12 +788,12 @@ func TestCTParseFailures(t *testing.T) { # TYPE something histogram something_count 17 something_sum 324789.3 -something_created 1520430001 +something_created 1520872607.123 something_bucket{le="0.0"} 0 something_bucket{le="+Inf"} 17 # HELP thing Histogram with _created as first line # TYPE thing histogram -thing_created 1520430002 +thing_created 1520872607.123 thing_count 17 thing_sum 324789.3 thing_bucket{le="0.0"} 0 @@ -802,12 +802,12 @@ thing_bucket{le="+Inf"} 17 # TYPE yum summary yum_count 17.0 yum_sum 324789.3 -yum_created 1520430003 +yum_created 1520872607.123 yum{quantile="0.95"} 123.7 yum{quantile="0.99"} 150.0 # HELP foobar Summary with _created as the first line # TYPE foobar summary -foobar_created 1520430004 +foobar_created 1520872607.123 foobar_count 17.0 foobar_sum 324789.3 foobar{quantile="0.95"} 123.7 @@ -836,19 +836,19 @@ foobar{quantile="0.99"} 150.0` isErr: false, }, { m: `something_count`, - ct: int64p(1520430001), + ct: int64p(1520872607123), isErr: false, }, { m: `something_sum`, - ct: int64p(1520430001), + ct: int64p(1520872607123), isErr: false, }, { m: `something_bucket{le="0.0"}`, - ct: int64p(1520430001), + ct: int64p(1520872607123), isErr: true, }, { m: `something_bucket{le="+Inf"}`, - ct: int64p(1520430001), + ct: int64p(1520872607123), isErr: true, }, { m: "thing", @@ -860,19 +860,19 @@ foobar{quantile="0.99"} 150.0` isErr: false, }, { m: `thing_count`, - ct: int64p(1520430002), + ct: int64p(1520872607123), isErr: true, }, { m: `thing_sum`, - ct: int64p(1520430002), + ct: int64p(1520872607123), isErr: true, }, { m: `thing_bucket{le="0.0"}`, - ct: int64p(1520430002), + ct: int64p(1520872607123), isErr: true, }, { m: `thing_bucket{le="+Inf"}`, - ct: int64p(1520430002), + ct: int64p(1520872607123), isErr: true, }, { m: "yum", @@ -884,19 +884,19 @@ foobar{quantile="0.99"} 150.0` isErr: false, }, { m: "yum_count", - ct: int64p(1520430003), + ct: int64p(1520872607123), isErr: false, }, { m: "yum_sum", - ct: int64p(1520430003), + ct: int64p(1520872607123), isErr: false, }, { m: `yum{quantile="0.95"}`, - ct: int64p(1520430003), + ct: int64p(1520872607123), isErr: true, }, { m: `yum{quantile="0.99"}`, - ct: int64p(1520430003), + ct: int64p(1520872607123), isErr: true, }, { m: "foobar", diff --git a/promql/fuzz.go b/promql/fuzz.go index 3fd50b949..57fd1166a 100644 --- a/promql/fuzz.go +++ b/promql/fuzz.go @@ -61,7 +61,7 @@ const ( var symbolTable = labels.NewSymbolTable() func fuzzParseMetricWithContentType(in []byte, contentType string) int { - p, warning := textparse.New(in, contentType, false, symbolTable) + p, warning := textparse.New(in, contentType, false, false, symbolTable) if warning != nil { // An invalid content type is being passed, which should not happen // in this context. diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 7e01238cc..8d2c3c968 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -14,6 +14,7 @@ package scrape import ( + "bytes" "context" "fmt" "net/http" @@ -30,11 +31,14 @@ import ( "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/timestamppb" "gopkg.in/yaml.v2" + "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" _ "github.com/prometheus/prometheus/discovery/file" @@ -719,143 +723,195 @@ scrape_configs: require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools()) } -// TestManagerCTZeroIngestion tests scrape manager for CT cases. -func TestManagerCTZeroIngestion(t *testing.T) { - const mName = "expected_counter" - - for _, tc := range []struct { - name string - counterSample *dto.Counter - enableCTZeroIngestion bool - }{ - { - name: "disabled with CT on counter", - counterSample: &dto.Counter{ - Value: proto.Float64(1.0), - // Timestamp does not matter as long as it exists in this test. - CreatedTimestamp: timestamppb.Now(), - }, - }, - { - name: "enabled with CT on counter", - counterSample: &dto.Counter{ - Value: proto.Float64(1.0), - // Timestamp does not matter as long as it exists in this test. - CreatedTimestamp: timestamppb.Now(), - }, - enableCTZeroIngestion: true, +func setupScrapeManager(t *testing.T, honorTimestamps, enableCTZeroIngestion bool) (*collectResultAppender, *Manager) { + app := &collectResultAppender{} + scrapeManager, err := NewManager( + &Options{ + EnableCreatedTimestampZeroIngestion: enableCTZeroIngestion, + skipOffsetting: true, }, - { - name: "enabled without CT on counter", - counterSample: &dto.Counter{ - Value: proto.Float64(1.0), - }, - enableCTZeroIngestion: true, + log.NewLogfmtLogger(os.Stderr), + nil, + &collectResultAppendable{app}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + + require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ + GlobalConfig: config.GlobalConfig{ + // Disable regular scrapes. + ScrapeInterval: model.Duration(9999 * time.Minute), + ScrapeTimeout: model.Duration(5 * time.Second), + ScrapeProtocols: []config.ScrapeProtocol{config.OpenMetricsText1_0_0, config.PrometheusProto}, }, - } { - t.Run(tc.name, func(t *testing.T) { - app := &collectResultAppender{} - scrapeManager, err := NewManager( - &Options{ - EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion, - skipOffsetting: true, - }, - log.NewLogfmtLogger(os.Stderr), - nil, - &collectResultAppendable{app}, - prometheus.NewRegistry(), - ) - require.NoError(t, err) + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test", HonorTimestamps: honorTimestamps}}, + })) - require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ - GlobalConfig: config.GlobalConfig{ - // Disable regular scrapes. - ScrapeInterval: model.Duration(9999 * time.Minute), - ScrapeTimeout: model.Duration(5 * time.Second), - // Ensure the proto is chosen. We need proto as it's the only protocol - // with the CT parsing support. - ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto}, - }, - ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, - })) + return app, scrapeManager +} - once := sync.Once{} - // Start fake HTTP target to that allow one scrape only. - server := httptest.NewServer( - http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fail := true - once.Do(func() { - fail = false - w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) +func setupTestServer(t *testing.T, typ string, toWrite []byte) *httptest.Server { + once := sync.Once{} - ctrType := dto.MetricType_COUNTER - w.Write(protoMarshalDelimited(t, &dto.MetricFamily{ - Name: proto.String(mName), - Type: &ctrType, - Metric: []*dto.Metric{{Counter: tc.counterSample}}, - })) - }) + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fail := true + once.Do(func() { + fail = false + w.Header().Set("Content-Type", typ) + w.Write(toWrite) + }) - if fail { - w.WriteHeader(http.StatusInternalServerError) - } - }), - ) - defer server.Close() + if fail { + w.WriteHeader(http.StatusInternalServerError) + } + }), + ) - serverURL, err := url.Parse(server.URL) - require.NoError(t, err) + t.Cleanup(func() { server.Close() }) - // Add fake target directly into tsets + reload. Normally users would use - // Manager.Run and wait for minimum 5s refresh interval. - scrapeManager.updateTsets(map[string][]*targetgroup.Group{ - "test": {{ - Targets: []model.LabelSet{{ - model.SchemeLabel: model.LabelValue(serverURL.Scheme), - model.AddressLabel: model.LabelValue(serverURL.Host), - }}, - }}, - }) - scrapeManager.reload() + return server +} - var got []float64 - // Wait for one scrape. - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() - require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { - app.mtx.Lock() - defer app.mtx.Unlock() +// TestManagerCTZeroIngestion tests scrape manager for various CT cases. +func TestManagerCTZeroIngestion(t *testing.T) { + const ( + // _total suffix is required, otherwise expfmt with OMText will mark metric as "unknown" + expectedMetricName = "expected_metric_total" + expectedCreatedMetricName = "expected_metric_created" + expectedSampleValue = 17.0 + ) - // Check if scrape happened and grab the relevant samples, they have to be there - or it's a bug - // and it's not worth waiting. - for _, f := range app.resultFloats { - if f.metric.Get(model.MetricNameLabel) == mName { - got = append(got, f.f) + for _, testFormat := range []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0} { + t.Run(fmt.Sprintf("format=%s", testFormat), func(t *testing.T) { + for _, testWithCT := range []bool{false, true} { + t.Run(fmt.Sprintf("withCT=%v", testWithCT), func(t *testing.T) { + for _, testCTZeroIngest := range []bool{false, true} { + t.Run(fmt.Sprintf("ctZeroIngest=%v", testCTZeroIngest), func(t *testing.T) { + sampleTs := time.Now() + ctTs := time.Time{} + if testWithCT { + ctTs = sampleTs.Add(-2 * time.Minute) + } + + // TODO(bwplotka): Add more types than just counter? + encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, ctTs) + app, scrapeManager := setupScrapeManager(t, true, testCTZeroIngest) + + // Perform the test. + doOneScrape(t, scrapeManager, app, setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded)) + + // Verify results. + // Verify what we got vs expectations around CT injection. + samples := findSamplesForMetric(app.resultFloats, expectedMetricName) + if testWithCT && testCTZeroIngest { + require.Len(t, samples, 2) + require.Equal(t, 0.0, samples[0].f) + require.Equal(t, timestamp.FromTime(ctTs), samples[0].t) + require.Equal(t, expectedSampleValue, samples[1].f) + require.Equal(t, timestamp.FromTime(sampleTs), samples[1].t) + } else { + require.Len(t, samples, 1) + require.Equal(t, expectedSampleValue, samples[0].f) + require.Equal(t, timestamp.FromTime(sampleTs), samples[0].t) + } + + // Verify what we got vs expectations around additional _created series for OM text. + // enableCTZeroInjection also kills that _created line. + createdSeriesSamples := findSamplesForMetric(app.resultFloats, expectedCreatedMetricName) + if testFormat == config.OpenMetricsText1_0_0 && testWithCT && !testCTZeroIngest { + // For OM Text, when counter has CT, and feature flag disabled we should see _created lines. + require.Len(t, createdSeriesSamples, 1) + // Conversion taken from common/expfmt.writeOpenMetricsFloat. + // We don't check the ct timestamp as explicit ts was not implemented in expfmt.Encoder, + // but exists in OM https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#:~:text=An%20example%20with%20a%20Metric%20with%20no%20labels%2C%20and%20a%20MetricPoint%20with%20a%20timestamp%20and%20a%20created + // We can implement this, but we want to potentially get rid of OM 1.0 CT lines + require.Equal(t, float64(timestamppb.New(ctTs).AsTime().UnixNano())/1e9, createdSeriesSamples[0].f) + } else { + require.Empty(t, createdSeriesSamples) + } + }) } - } - if len(app.resultFloats) > 0 { - return nil - } - return fmt.Errorf("expected some samples, got none") - }), "after 1 minute") - scrapeManager.Stop() - - // Check for zero samples, assuming we only injected always one sample. - // Did it contain CT to inject? If yes, was CT zero enabled? - if tc.counterSample.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion { - require.Len(t, got, 2) - require.Equal(t, 0.0, got[0]) - require.Equal(t, tc.counterSample.GetValue(), got[1]) - return + }) } - - // Expect only one, valid sample. - require.Len(t, got, 1) - require.Equal(t, tc.counterSample.GetValue(), got[0]) }) } } +func prepareTestEncodedCounter(t *testing.T, format config.ScrapeProtocol, mName string, v float64, ts, ct time.Time) (encoded []byte) { + t.Helper() + + counter := &dto.Counter{Value: proto.Float64(v)} + if !ct.IsZero() { + counter.CreatedTimestamp = timestamppb.New(ct) + } + ctrType := dto.MetricType_COUNTER + inputMetric := &dto.MetricFamily{ + Name: proto.String(mName), + Type: &ctrType, + Metric: []*dto.Metric{{ + TimestampMs: proto.Int64(timestamp.FromTime(ts)), + Counter: counter, + }}, + } + switch format { + case config.PrometheusProto: + return protoMarshalDelimited(t, inputMetric) + case config.OpenMetricsText1_0_0: + buf := &bytes.Buffer{} + require.NoError(t, expfmt.NewEncoder(buf, expfmt.NewFormat(expfmt.TypeOpenMetrics), expfmt.WithCreatedLines(), expfmt.WithUnit()).Encode(inputMetric)) + _, _ = buf.WriteString("# EOF") + + t.Log("produced OM text to expose:", buf.String()) + return buf.Bytes() + default: + t.Fatalf("not implemented format: %v", format) + return nil + } +} + +func doOneScrape(t *testing.T, manager *Manager, appender *collectResultAppender, server *httptest.Server) { + t.Helper() + + serverURL, err := url.Parse(server.URL) + require.NoError(t, err) + + // Add fake target directly into tsets + reload + manager.updateTsets(map[string][]*targetgroup.Group{ + "test": {{ + Targets: []model.LabelSet{{ + model.SchemeLabel: model.LabelValue(serverURL.Scheme), + model.AddressLabel: model.LabelValue(serverURL.Host), + }}, + }}, + }) + manager.reload() + + // Wait for one scrape. + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { + appender.mtx.Lock() + defer appender.mtx.Unlock() + + // Check if scrape happened and grab the relevant samples. + if len(appender.resultFloats) > 0 { + return nil + } + return fmt.Errorf("expected some float samples, got none") + }), "after 1 minute") + manager.Stop() +} + +func findSamplesForMetric(floats []floatSample, metricName string) (ret []floatSample) { + for _, f := range floats { + if f.metric.Get(model.MetricNameLabel) == metricName { + ret = append(ret, f) + } + } + return ret +} + // generateTestHistogram generates the same thing as tsdbutil.GenerateTestHistogram, // but in the form of dto.Histogram. func generateTestHistogram(i int) *dto.Histogram { diff --git a/scrape/scrape.go b/scrape/scrape.go index 071edfca5..c66f203dd 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1536,7 +1536,7 @@ type appendErrors struct { } func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) { - p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.symbolTable) + p, err := textparse.New(b, contentType, sl.scrapeClassicHistograms, sl.enableCTZeroIngestion, sl.symbolTable) if err != nil { level.Debug(sl.l).Log( "msg", "Invalid content type on scrape, using prometheus parser as fallback.", diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 04fd53601..57c51b2e9 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1525,7 +1525,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) { fakeRef := storage.SeriesRef(1) expValue := float64(1) metric := []byte(`metric{n="1"} 1`) - p, warning := textparse.New(metric, "", false, labels.NewSymbolTable()) + p, warning := textparse.New(metric, "", false, false, labels.NewSymbolTable()) require.NoError(t, warning) var lset labels.Labels