mirror of https://github.com/prometheus/prometheus
Merge pull request #2897 from Gouthamve/oob-metric
Handle scrapes with OutOfBounds metrics betterreviewable/pr2906/r1
commit
329992201e
|
@ -113,7 +113,8 @@ type scrapePool struct {
|
||||||
// Constructor for new scrape loops. This is settable for testing convenience.
|
// Constructor for new scrape loops. This is settable for testing convenience.
|
||||||
newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender, log.Logger) loop
|
newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender, log.Logger) loop
|
||||||
|
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
|
maxAheadTime time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
|
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
|
||||||
|
@ -133,14 +134,15 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
|
||||||
}
|
}
|
||||||
|
|
||||||
return &scrapePool{
|
return &scrapePool{
|
||||||
appendable: app,
|
appendable: app,
|
||||||
config: cfg,
|
config: cfg,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
client: client,
|
client: client,
|
||||||
targets: map[uint64]*Target{},
|
targets: map[uint64]*Target{},
|
||||||
loops: map[uint64]loop{},
|
loops: map[uint64]loop{},
|
||||||
newLoop: newLoop,
|
newLoop: newLoop,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
|
maxAheadTime: 10 * time.Minute,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -310,6 +312,13 @@ func (sp *scrapePool) sampleAppender(target *Target) storage.Appender {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if sp.maxAheadTime > 0 {
|
||||||
|
app = &timeLimitAppender{
|
||||||
|
Appender: app,
|
||||||
|
maxTime: timestamp.FromTime(time.Now().Add(sp.maxAheadTime)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// The limit is applied after metrics are potentially dropped via relabeling.
|
// The limit is applied after metrics are potentially dropped via relabeling.
|
||||||
if sp.config.SampleLimit > 0 {
|
if sp.config.SampleLimit > 0 {
|
||||||
app = &limitAppender{
|
app = &limitAppender{
|
||||||
|
@ -722,11 +731,12 @@ func (s samples) Less(i, j int) bool {
|
||||||
|
|
||||||
func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err error) {
|
func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err error) {
|
||||||
var (
|
var (
|
||||||
app = sl.appender()
|
app = sl.appender()
|
||||||
p = textparse.New(b)
|
p = textparse.New(b)
|
||||||
defTime = timestamp.FromTime(ts)
|
defTime = timestamp.FromTime(ts)
|
||||||
numOutOfOrder = 0
|
numOutOfOrder = 0
|
||||||
numDuplicates = 0
|
numDuplicates = 0
|
||||||
|
numOutOfBounds = 0
|
||||||
)
|
)
|
||||||
var sampleLimitErr error
|
var sampleLimitErr error
|
||||||
|
|
||||||
|
@ -761,6 +771,10 @@ loop:
|
||||||
numDuplicates++
|
numDuplicates++
|
||||||
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
|
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
|
||||||
continue
|
continue
|
||||||
|
case storage.ErrOutOfBounds:
|
||||||
|
numOutOfBounds++
|
||||||
|
sl.l.With("timeseries", string(met)).Debug("Out of bounds metric")
|
||||||
|
continue
|
||||||
case errSampleLimit:
|
case errSampleLimit:
|
||||||
// Keep on parsing output if we hit the limit, so we report the correct
|
// Keep on parsing output if we hit the limit, so we report the correct
|
||||||
// total number of samples scraped.
|
// total number of samples scraped.
|
||||||
|
@ -804,6 +818,11 @@ loop:
|
||||||
numDuplicates++
|
numDuplicates++
|
||||||
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
|
sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp")
|
||||||
continue
|
continue
|
||||||
|
case storage.ErrOutOfBounds:
|
||||||
|
err = nil
|
||||||
|
numOutOfBounds++
|
||||||
|
sl.l.With("timeseries", string(met)).Debug("Out of bounds metric")
|
||||||
|
continue
|
||||||
case errSampleLimit:
|
case errSampleLimit:
|
||||||
sampleLimitErr = err
|
sampleLimitErr = err
|
||||||
added++
|
added++
|
||||||
|
@ -832,6 +851,9 @@ loop:
|
||||||
if numDuplicates > 0 {
|
if numDuplicates > 0 {
|
||||||
sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
|
sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
|
||||||
}
|
}
|
||||||
|
if numOutOfBounds > 0 {
|
||||||
|
sl.l.With("numOutOfBounds", numOutOfBounds).Warn("Error on ingesting samples that are too old")
|
||||||
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
sl.cache.forEachStale(func(lset labels.Labels) bool {
|
sl.cache.forEachStale(func(lset labels.Labels) bool {
|
||||||
// Series no longer exposed, mark it stale.
|
// Series no longer exposed, mark it stale.
|
||||||
|
|
|
@ -273,6 +273,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
|
||||||
app := &nopAppendable{}
|
app := &nopAppendable{}
|
||||||
|
|
||||||
sp := newScrapePool(context.Background(), cfg, app, log.Base())
|
sp := newScrapePool(context.Background(), cfg, app, log.Base())
|
||||||
|
sp.maxAheadTime = 0
|
||||||
|
|
||||||
cfg.HonorLabels = false
|
cfg.HonorLabels = false
|
||||||
wrapped := sp.sampleAppender(target)
|
wrapped := sp.sampleAppender(target)
|
||||||
|
@ -872,19 +873,23 @@ type errorAppender struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||||
if lset.Get(model.MetricNameLabel) == "out_of_order" {
|
switch lset.Get(model.MetricNameLabel) {
|
||||||
|
case "out_of_order":
|
||||||
return "", storage.ErrOutOfOrderSample
|
return "", storage.ErrOutOfOrderSample
|
||||||
} else if lset.Get(model.MetricNameLabel) == "amend" {
|
case "amend":
|
||||||
return "", storage.ErrDuplicateSampleForTimestamp
|
return "", storage.ErrDuplicateSampleForTimestamp
|
||||||
|
case "out_of_bounds":
|
||||||
|
return "", storage.ErrOutOfBounds
|
||||||
|
default:
|
||||||
|
return app.collectResultAppender.Add(lset, t, v)
|
||||||
}
|
}
|
||||||
return app.collectResultAppender.Add(lset, t, v)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *errorAppender) AddFast(ref string, t int64, v float64) error {
|
func (app *errorAppender) AddFast(ref string, t int64, v float64) error {
|
||||||
return app.collectResultAppender.AddFast(ref, t, v)
|
return app.collectResultAppender.AddFast(ref, t, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) {
|
func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) {
|
||||||
app := &errorAppender{}
|
app := &errorAppender{}
|
||||||
sl := newScrapeLoop(context.Background(), nil,
|
sl := newScrapeLoop(context.Background(), nil,
|
||||||
func() storage.Appender { return app },
|
func() storage.Appender { return app },
|
||||||
|
@ -893,7 +898,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
now := time.Unix(1, 0)
|
now := time.Unix(1, 0)
|
||||||
_, _, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\n"), now)
|
_, _, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), now)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected append error: %s", err)
|
t.Fatalf("Unexpected append error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -907,7 +912,35 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) {
|
||||||
if !reflect.DeepEqual(want, app.result) {
|
if !reflect.DeepEqual(want, app.result) {
|
||||||
t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, app.result)
|
t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, app.result)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
|
||||||
|
app := &collectResultAppender{}
|
||||||
|
sl := newScrapeLoop(context.Background(), nil,
|
||||||
|
func() storage.Appender {
|
||||||
|
return &timeLimitAppender{
|
||||||
|
Appender: app,
|
||||||
|
maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
func() storage.Appender { return nopAppender{} },
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
|
||||||
|
now := time.Now().Add(20 * time.Minute)
|
||||||
|
total, added, err := sl.append([]byte("normal 1\n"), now)
|
||||||
|
if total != 1 {
|
||||||
|
t.Error("expected 1 metric")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if added != 0 {
|
||||||
|
t.Error("no metric should be added")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("expect no error, got %s", err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTargetScraperScrapeOK(t *testing.T) {
|
func TestTargetScraperScrapeOK(t *testing.T) {
|
||||||
|
|
|
@ -225,6 +225,34 @@ func (app *limitAppender) AddFast(ref string, t int64, v float64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type timeLimitAppender struct {
|
||||||
|
storage.Appender
|
||||||
|
|
||||||
|
maxTime int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *timeLimitAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||||
|
if t > app.maxTime {
|
||||||
|
return "", storage.ErrOutOfBounds
|
||||||
|
}
|
||||||
|
|
||||||
|
ref, err := app.Appender.Add(lset, t, v)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *timeLimitAppender) AddFast(ref string, t int64, v float64) error {
|
||||||
|
if t > app.maxTime {
|
||||||
|
return storage.ErrOutOfBounds
|
||||||
|
}
|
||||||
|
if err := app.Appender.AddFast(ref, t, v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Merges the ingested sample's metric with the label set. On a collision the
|
// Merges the ingested sample's metric with the label set. On a collision the
|
||||||
// value of the ingested label is stored in a label prefixed with 'exported_'.
|
// value of the ingested label is stored in a label prefixed with 'exported_'.
|
||||||
type ruleLabelsAppender struct {
|
type ruleLabelsAppender struct {
|
||||||
|
|
|
@ -19,10 +19,12 @@ import (
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// The errors exposed.
|
||||||
var (
|
var (
|
||||||
ErrNotFound = errors.New("not found")
|
ErrNotFound = errors.New("not found")
|
||||||
ErrOutOfOrderSample = errors.New("out of order sample")
|
ErrOutOfOrderSample = errors.New("out of order sample")
|
||||||
ErrDuplicateSampleForTimestamp = errors.New("duplicate sample for timestamp")
|
ErrDuplicateSampleForTimestamp = errors.New("duplicate sample for timestamp")
|
||||||
|
ErrOutOfBounds = errors.New("out of bounds")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Storage ingests and manages samples, along with various indexes. All methods
|
// Storage ingests and manages samples, along with various indexes. All methods
|
||||||
|
|
Loading…
Reference in New Issue