@ -14,6 +14,7 @@
package scrape
import (
"bytes"
"context"
"fmt"
"net/http"
@ -30,11 +31,14 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/yaml.v2"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
_ "github.com/prometheus/prometheus/discovery/file"
@ -719,143 +723,195 @@ scrape_configs:
require . ElementsMatch ( t , [ ] string { "job1" , "job3" } , scrapeManager . ScrapePools ( ) )
}
// TestManagerCTZeroIngestion tests scrape manager for CT cases.
func TestManagerCTZeroIngestion ( t * testing . T ) {
const mName = "expected_counter"
for _ , tc := range [ ] struct {
name string
counterSample * dto . Counter
enableCTZeroIngestion bool
} {
{
name : "disabled with CT on counter" ,
counterSample : & dto . Counter {
Value : proto . Float64 ( 1.0 ) ,
// Timestamp does not matter as long as it exists in this test.
CreatedTimestamp : timestamppb . Now ( ) ,
} ,
} ,
{
name : "enabled with CT on counter" ,
counterSample : & dto . Counter {
Value : proto . Float64 ( 1.0 ) ,
// Timestamp does not matter as long as it exists in this test.
CreatedTimestamp : timestamppb . Now ( ) ,
} ,
enableCTZeroIngestion : true ,
func setupScrapeManager ( t * testing . T , honorTimestamps , enableCTZeroIngestion bool ) ( * collectResultAppender , * Manager ) {
app := & collectResultAppender { }
scrapeManager , err := NewManager (
& Options {
EnableCreatedTimestampZeroIngestion : enableCTZeroIngestion ,
skipOffsetting : true ,
} ,
{
name : "enabled without CT on counter" ,
counterSample : & dto . Counter {
Value : proto . Float64 ( 1.0 ) ,
} ,
enableCTZeroIngestion : true ,
log . NewLogfmtLogger ( os . Stderr ) ,
nil ,
& collectResultAppendable { app } ,
prometheus . NewRegistry ( ) ,
)
require . NoError ( t , err )
require . NoError ( t , scrapeManager . ApplyConfig ( & config . Config {
GlobalConfig : config . GlobalConfig {
// Disable regular scrapes.
ScrapeInterval : model . Duration ( 9999 * time . Minute ) ,
ScrapeTimeout : model . Duration ( 5 * time . Second ) ,
ScrapeProtocols : [ ] config . ScrapeProtocol { config . OpenMetricsText1_0_0 , config . PrometheusProto } ,
} ,
} {
t . Run ( tc . name , func ( t * testing . T ) {
app := & collectResultAppender { }
scrapeManager , err := NewManager (
& Options {
EnableCreatedTimestampZeroIngestion : tc . enableCTZeroIngestion ,
skipOffsetting : true ,
} ,
log . NewLogfmtLogger ( os . Stderr ) ,
nil ,
& collectResultAppendable { app } ,
prometheus . NewRegistry ( ) ,
)
require . NoError ( t , err )
ScrapeConfigs : [ ] * config . ScrapeConfig { { JobName : "test" , HonorTimestamps : honorTimestamps } } ,
} ) )
require . NoError ( t , scrapeManager . ApplyConfig ( & config . Config {
GlobalConfig : config . GlobalConfig {
// Disable regular scrapes.
ScrapeInterval : model . Duration ( 9999 * time . Minute ) ,
ScrapeTimeout : model . Duration ( 5 * time . Second ) ,
// Ensure the proto is chosen. We need proto as it's the only protocol
// with the CT parsing support.
ScrapeProtocols : [ ] config . ScrapeProtocol { config . PrometheusProto } ,
} ,
ScrapeConfigs : [ ] * config . ScrapeConfig { { JobName : "test" } } ,
} ) )
return app , scrapeManager
}
once := sync . Once { }
// Start fake HTTP target to that allow one scrape only.
server := httptest . NewServer (
http . HandlerFunc ( func ( w http . ResponseWriter , r * http . Request ) {
fail := true
once . Do ( func ( ) {
fail = false
w . Header ( ) . Set ( "Content-Type" , ` application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited ` )
func setupTestServer ( t * testing . T , typ string , toWrite [ ] byte ) * httptest . Server {
once := sync . Once { }
ctrType := dto . MetricType_COUNTER
w . Write ( protoMarshalDelimited ( t , & dto . MetricFamily {
Name : proto . String ( mName ) ,
Type : & ctrType ,
Metric : [ ] * dto . Metric { { Counter : tc . counterSample } } ,
} ) )
} )
server := httptest . NewServer (
http . HandlerFunc ( func ( w http . ResponseWriter , r * http . Request ) {
fail := true
once . Do ( func ( ) {
fail = false
w . Header ( ) . Set ( "Content-Type" , typ )
w . Write ( toWrite )
} )
if fail {
w . WriteHeader ( http . StatusInternalServerError )
}
} ) ,
)
defer server . Close ( )
if fail {
w . WriteHeader ( http . StatusInternalServerError )
}
} ) ,
)
serverURL , err := url . Parse ( server . URL )
require . NoError ( t , err )
t . Cleanup ( func ( ) { server . Close ( ) } )
// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager . updateTsets ( map [ string ] [ ] * targetgroup . Group {
"test" : { {
Targets : [ ] model . LabelSet { {
model . SchemeLabel : model . LabelValue ( serverURL . Scheme ) ,
model . AddressLabel : model . LabelValue ( serverURL . Host ) ,
} } ,
} } ,
} )
scrapeManager . reload ( )
return server
}
var got [ ] float64
// Wait for one scrape.
ctx , cancel := context . WithTimeout ( context . Background ( ) , 1 * time . Minute )
defer cancel ( )
require . NoError ( t , runutil . Retry ( 100 * time . Millisecond , ctx . Done ( ) , func ( ) error {
app . mtx . Lock ( )
defer app . mtx . Unlock ( )
// TestManagerCTZeroIngestion tests scrape manager for various CT cases.
func TestManagerCTZeroIngestion ( t * testing . T ) {
const (
// _total suffix is required, otherwise expfmt with OMText will mark metric as "unknown"
expectedMetricName = "expected_metric_total"
expectedCreatedMetricName = "expected_metric_created"
expectedSampleValue = 17.0
)
// Check if scrape happened and grab the relevant samples, they have to be there - or it's a bug
// and it's not worth waiting.
for _ , f := range app . resultFloats {
if f . metric . Get ( model . MetricNameLabel ) == mName {
got = append ( got , f . f )
for _ , testFormat := range [ ] config . ScrapeProtocol { config . PrometheusProto , config . OpenMetricsText1_0_0 } {
t . Run ( fmt . Sprintf ( "format=%s" , testFormat ) , func ( t * testing . T ) {
for _ , testWithCT := range [ ] bool { false , true } {
t . Run ( fmt . Sprintf ( "withCT=%v" , testWithCT ) , func ( t * testing . T ) {
for _ , testCTZeroIngest := range [ ] bool { false , true } {
t . Run ( fmt . Sprintf ( "ctZeroIngest=%v" , testCTZeroIngest ) , func ( t * testing . T ) {
sampleTs := time . Now ( )
ctTs := time . Time { }
if testWithCT {
ctTs = sampleTs . Add ( - 2 * time . Minute )
}
// TODO(bwplotka): Add more types than just counter?
encoded := prepareTestEncodedCounter ( t , testFormat , expectedMetricName , expectedSampleValue , sampleTs , ctTs )
app , scrapeManager := setupScrapeManager ( t , true , testCTZeroIngest )
// Perform the test.
doOneScrape ( t , scrapeManager , app , setupTestServer ( t , config . ScrapeProtocolsHeaders [ testFormat ] , encoded ) )
// Verify results.
// Verify what we got vs expectations around CT injection.
samples := findSamplesForMetric ( app . resultFloats , expectedMetricName )
if testWithCT && testCTZeroIngest {
require . Len ( t , samples , 2 )
require . Equal ( t , 0.0 , samples [ 0 ] . f )
require . Equal ( t , timestamp . FromTime ( ctTs ) , samples [ 0 ] . t )
require . Equal ( t , expectedSampleValue , samples [ 1 ] . f )
require . Equal ( t , timestamp . FromTime ( sampleTs ) , samples [ 1 ] . t )
} else {
require . Len ( t , samples , 1 )
require . Equal ( t , expectedSampleValue , samples [ 0 ] . f )
require . Equal ( t , timestamp . FromTime ( sampleTs ) , samples [ 0 ] . t )
}
// Verify what we got vs expectations around additional _created series for OM text.
// enableCTZeroInjection also kills that _created line.
createdSeriesSamples := findSamplesForMetric ( app . resultFloats , expectedCreatedMetricName )
if testFormat == config . OpenMetricsText1_0_0 && testWithCT && ! testCTZeroIngest {
// For OM Text, when counter has CT, and feature flag disabled we should see _created lines.
require . Len ( t , createdSeriesSamples , 1 )
// Conversion taken from common/expfmt.writeOpenMetricsFloat.
// We don't check the ct timestamp as explicit ts was not implemented in expfmt.Encoder,
// but exists in OM https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#:~:text=An%20example%20with%20a%20Metric%20with%20no%20labels%2C%20and%20a%20MetricPoint%20with%20a%20timestamp%20and%20a%20created
// We can implement this, but we want to potentially get rid of OM 1.0 CT lines
require . Equal ( t , float64 ( timestamppb . New ( ctTs ) . AsTime ( ) . UnixNano ( ) ) / 1e9 , createdSeriesSamples [ 0 ] . f )
} else {
require . Empty ( t , createdSeriesSamples )
}
} )
}
}
if len ( app . resultFloats ) > 0 {
return nil
}
return fmt . Errorf ( "expected some samples, got none" )
} ) , "after 1 minute" )
scrapeManager . Stop ( )
// Check for zero samples, assuming we only injected always one sample.
// Did it contain CT to inject? If yes, was CT zero enabled?
if tc . counterSample . CreatedTimestamp . IsValid ( ) && tc . enableCTZeroIngestion {
require . Len ( t , got , 2 )
require . Equal ( t , 0.0 , got [ 0 ] )
require . Equal ( t , tc . counterSample . GetValue ( ) , got [ 1 ] )
return
} )
}
// Expect only one, valid sample.
require . Len ( t , got , 1 )
require . Equal ( t , tc . counterSample . GetValue ( ) , got [ 0 ] )
} )
}
}
func prepareTestEncodedCounter ( t * testing . T , format config . ScrapeProtocol , mName string , v float64 , ts , ct time . Time ) ( encoded [ ] byte ) {
t . Helper ( )
counter := & dto . Counter { Value : proto . Float64 ( v ) }
if ! ct . IsZero ( ) {
counter . CreatedTimestamp = timestamppb . New ( ct )
}
ctrType := dto . MetricType_COUNTER
inputMetric := & dto . MetricFamily {
Name : proto . String ( mName ) ,
Type : & ctrType ,
Metric : [ ] * dto . Metric { {
TimestampMs : proto . Int64 ( timestamp . FromTime ( ts ) ) ,
Counter : counter ,
} } ,
}
switch format {
case config . PrometheusProto :
return protoMarshalDelimited ( t , inputMetric )
case config . OpenMetricsText1_0_0 :
buf := & bytes . Buffer { }
require . NoError ( t , expfmt . NewEncoder ( buf , expfmt . NewFormat ( expfmt . TypeOpenMetrics ) , expfmt . WithCreatedLines ( ) , expfmt . WithUnit ( ) ) . Encode ( inputMetric ) )
_ , _ = buf . WriteString ( "# EOF" )
t . Log ( "produced OM text to expose:" , buf . String ( ) )
return buf . Bytes ( )
default :
t . Fatalf ( "not implemented format: %v" , format )
return nil
}
}
func doOneScrape ( t * testing . T , manager * Manager , appender * collectResultAppender , server * httptest . Server ) {
t . Helper ( )
serverURL , err := url . Parse ( server . URL )
require . NoError ( t , err )
// Add fake target directly into tsets + reload
manager . updateTsets ( map [ string ] [ ] * targetgroup . Group {
"test" : { {
Targets : [ ] model . LabelSet { {
model . SchemeLabel : model . LabelValue ( serverURL . Scheme ) ,
model . AddressLabel : model . LabelValue ( serverURL . Host ) ,
} } ,
} } ,
} )
manager . reload ( )
// Wait for one scrape.
ctx , cancel := context . WithTimeout ( context . Background ( ) , 1 * time . Minute )
defer cancel ( )
require . NoError ( t , runutil . Retry ( 100 * time . Millisecond , ctx . Done ( ) , func ( ) error {
appender . mtx . Lock ( )
defer appender . mtx . Unlock ( )
// Check if scrape happened and grab the relevant samples.
if len ( appender . resultFloats ) > 0 {
return nil
}
return fmt . Errorf ( "expected some float samples, got none" )
} ) , "after 1 minute" )
manager . Stop ( )
}
func findSamplesForMetric ( floats [ ] floatSample , metricName string ) ( ret [ ] floatSample ) {
for _ , f := range floats {
if f . metric . Get ( model . MetricNameLabel ) == metricName {
ret = append ( ret , f )
}
}
return ret
}
// generateTestHistogram generates the same thing as tsdbutil.GenerateTestHistogram,
// but in the form of dto.Histogram.
func generateTestHistogram ( i int ) * dto . Histogram {