retrieval: cache dropped series, mutate labels in place

pull/3172/head
Fabian Reinartz 7 years ago
parent 13f59329ab
commit 1121b9f7d4

@ -27,6 +27,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
"golang.org/x/net/context"
@ -35,6 +36,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/pool"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/value"
@ -121,8 +123,8 @@ func init() {
// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
appendable Appendable
ctx context.Context
logger log.Logger
ctx context.Context
mtx sync.RWMutex
config *config.ScrapeConfig
@ -133,12 +135,13 @@ type scrapePool struct {
loops map[uint64]loop
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender, log.Logger) loop
logger log.Logger
maxAheadTime time.Duration
newLoop func(*Target, scraper) loop
}
const maxAheadTime = 10 * time.Minute
type labelsMutator func(labels.Labels) labels.Labels
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
if err != nil {
@ -148,26 +151,26 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
buffers := pool.NewBytesPool(163, 100e6, 3)
newLoop := func(
ctx context.Context,
s scraper,
app, reportApp func() storage.Appender,
l log.Logger,
) loop {
return newScrapeLoop(ctx, s, app, reportApp, buffers, l)
sp := &scrapePool{
appendable: app,
config: cfg,
ctx: ctx,
client: client,
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
logger: logger,
}
return &scrapePool{
appendable: app,
config: cfg,
ctx: ctx,
client: client,
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newLoop,
logger: logger,
maxAheadTime: 10 * time.Minute,
sp.newLoop = func(t *Target, s scraper) loop {
return newScrapeLoop(sp.ctx, s,
logger.With("target", t),
buffers,
func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) },
func(l labels.Labels) labels.Labels { return sp.mutateReportSampleLabels(l, t) },
sp.appender,
)
}
return sp
}
// stop terminates all scrape loops and returns after they all terminated.
@ -219,15 +222,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
var (
t = sp.targets[fp]
s = &targetScraper{Target: t, client: sp.client, timeout: timeout}
newLoop = sp.newLoop(sp.ctx, s,
func() storage.Appender {
return sp.sampleAppender(t)
},
func() storage.Appender {
return sp.reportAppender(t)
},
sp.logger.With("target", t.labels.String()),
)
newLoop = sp.newLoop(t, s)
)
wg.Add(1)
@ -289,15 +284,7 @@ func (sp *scrapePool) sync(targets []*Target) {
if _, ok := sp.targets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
l := sp.newLoop(sp.ctx, s,
func() storage.Appender {
return sp.sampleAppender(t)
},
func() storage.Appender {
return sp.reportAppender(t)
},
sp.logger.With("target", t.labels.String()),
)
l := sp.newLoop(t, s)
sp.targets[hash] = t
sp.loops[hash] = l
@ -329,61 +316,68 @@ func (sp *scrapePool) sync(targets []*Target) {
wg.Wait()
}
// sampleAppender returns an appender for ingested samples from the target.
func (sp *scrapePool) sampleAppender(target *Target) storage.Appender {
app, err := sp.appendable.Appender()
if err != nil {
panic(err)
}
func (sp *scrapePool) mutateSampleLabels(lset labels.Labels, target *Target) labels.Labels {
lb := labels.NewBuilder(lset)
if sp.maxAheadTime > 0 {
app = &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(sp.maxAheadTime)),
if sp.config.HonorLabels {
for _, l := range target.Labels() {
if lv := lset.Get(l.Name); lv == "" {
lb.Set(l.Name, l.Value)
}
}
}
// The limit is applied after metrics are potentially dropped via relabeling.
if sp.config.SampleLimit > 0 {
app = &limitAppender{
Appender: app,
limit: int(sp.config.SampleLimit),
} else {
for _, l := range target.Labels() {
lv := lset.Get(l.Name)
if lv != "" {
lb.Set(model.ExportedLabelPrefix+l.Name, lv)
}
lb.Set(l.Name, l.Value)
}
}
// The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set.
res := lb.Labels()
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
app = relabelAppender{
Appender: app,
relabelings: mrc,
}
res = relabel.Process(res, mrc...)
}
if sp.config.HonorLabels {
app = honorLabelsAppender{
Appender: app,
labels: target.Labels(),
}
} else {
app = ruleLabelsAppender{
Appender: app,
labels: target.Labels(),
return res
}
func (sp *scrapePool) mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels {
lb := labels.NewBuilder(lset)
for _, l := range target.Labels() {
lv := lset.Get(l.Name)
if lv != "" {
lb.Set(model.ExportedLabelPrefix+l.Name, lv)
}
lb.Set(l.Name, l.Value)
}
return app
return lb.Labels()
}
// reportAppender returns an appender for reporting samples for the target.
func (sp *scrapePool) reportAppender(target *Target) storage.Appender {
// appender returns an appender for ingested samples from the target.
func (sp *scrapePool) appender() storage.Appender {
app, err := sp.appendable.Appender()
if err != nil {
panic(err)
}
return ruleLabelsAppender{
app = &timeLimitAppender{
Appender: app,
labels: target.Labels(),
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
}
// The limit is applied after metrics are potentially dropped via relabeling.
if sp.config.SampleLimit > 0 {
app = &limitAppender{
Appender: app,
limit: int(sp.config.SampleLimit),
}
}
return app
}
// A scraper retrieves samples and accepts a status report at the end.
@ -479,8 +473,9 @@ type scrapeLoop struct {
lastScrapeSize int
buffers *pool.BytesPool
appender func() storage.Appender
reportAppender func() storage.Appender
appender func() storage.Appender
sampleMutator labelsMutator
reportSampleMutator labelsMutator
ctx context.Context
scrapeCtx context.Context
@ -497,6 +492,11 @@ type scrapeCache struct {
refs map[string]*refEntry // Parsed string to ref.
lsets map[uint64]*lsetCacheEntry // Ref to labelset and string.
// Cache of dropped metric strings and their iteration. The iteration must
// be a pointer so we can update it without setting a new entry with an unsafe
// string in addDropped().
dropped map[string]*uint64
// seriesCur and seriesPrev store the labels of series that were seen
// in the current and previous scrape.
// We hold two maps and swap them out to save allocations.
@ -508,6 +508,7 @@ func newScrapeCache() *scrapeCache {
return &scrapeCache{
refs: map[string]*refEntry{},
lsets: map[uint64]*lsetCacheEntry{},
dropped: map[string]*uint64{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
}
@ -523,6 +524,11 @@ func (c *scrapeCache) iterDone() {
delete(c.lsets, e.ref)
}
}
for s, iter := range c.dropped {
if *iter < c.iter {
delete(c.dropped, s)
}
}
// Swap current and previous series.
c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev
@ -560,6 +566,19 @@ func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash ui
c.lsets[ref] = &lsetCacheEntry{metric: met, lset: lset, hash: hash}
}
func (c *scrapeCache) addDropped(met string) {
iter := c.iter
c.dropped[met] = &iter
}
func (c *scrapeCache) getDropped(met string) bool {
iterp, ok := c.dropped[met]
if ok {
*iterp = c.iter
}
return ok
}
func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) {
c.seriesCur[hash] = lset
}
@ -577,26 +596,28 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
func newScrapeLoop(
ctx context.Context,
sc scraper,
app, reportApp func() storage.Appender,
buffers *pool.BytesPool,
l log.Logger,
buffers *pool.BytesPool,
sampleMutator labelsMutator,
reportSampleMutator labelsMutator,
appender func() storage.Appender,
) *scrapeLoop {
if l == nil {
l = log.Base()
}
if buffers == nil {
buffers = pool.NewBytesPool(10e3, 100e6, 3)
buffers = pool.NewBytesPool(1e3, 1e6, 3)
}
sl := &scrapeLoop{
scraper: sc,
appender: app,
cache: newScrapeCache(),
reportAppender: reportApp,
buffers: buffers,
lastScrapeSize: 16000,
stopped: make(chan struct{}),
ctx: ctx,
l: l,
scraper: sc,
buffers: buffers,
cache: newScrapeCache(),
appender: appender,
sampleMutator: sampleMutator,
reportSampleMutator: reportSampleMutator,
stopped: make(chan struct{}),
ctx: ctx,
l: l,
}
sl.scrapeCtx, sl.cancel = context.WithCancel(ctx)
@ -796,6 +817,9 @@ loop:
t = *tp
}
if sl.cache.getDropped(yoloString(met)) {
continue
}
ref, ok := sl.cache.getRef(yoloString(met))
if ok {
lset := sl.cache.lsets[ref].lset
@ -807,9 +831,6 @@ loop:
}
case storage.ErrNotFound:
ok = false
case errSeriesDropped:
err = nil
continue
case storage.ErrOutOfOrderSample:
numOutOfOrder++
sl.l.With("timeseries", string(met)).Debug("Out of order sample")
@ -848,6 +869,16 @@ loop:
} else {
mets = p.Metric(&lset)
hash = lset.Hash()
// Hash label set as it is seen local to the target. Then add target labels
// and relabeling and store the final label set.
lset = sl.sampleMutator(lset)
// The label set may be set to nil to indicate dropping.
if lset == nil {
sl.cache.addDropped(mets)
continue
}
}
var ref uint64
@ -855,9 +886,6 @@ loop:
// TODO(fabxc): also add a dropped-cache?
switch err {
case nil:
case errSeriesDropped:
err = nil
continue
case storage.ErrOutOfOrderSample:
err = nil
numOutOfOrder++
@ -912,8 +940,6 @@ loop:
// Series no longer exposed, mark it stale.
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
switch err {
case errSeriesDropped:
err = nil
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if a target
// goes away and comes back again with a new scrape loop.
@ -948,8 +974,7 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a
if err == nil {
health = 1
}
app := sl.reportAppender()
app := sl.appender()
if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil {
app.Rollback()
@ -972,7 +997,8 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a
func (sl *scrapeLoop) reportStale(start time.Time) error {
ts := timestamp.FromTime(start)
app := sl.reportAppender()
app := sl.appender()
stale := math.Float64frombits(value.StaleNaN)
if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil {
@ -1019,10 +1045,14 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
lset := labels.Labels{
labels.Label{Name: labels.MetricName, Value: s},
}
hash := lset.Hash()
lset = sl.reportSampleMutator(lset)
ref, err := app.Add(lset, t, v)
switch err {
case nil:
sl.cache.addRef(s2, ref, lset, lset.Hash())
sl.cache.addRef(s2, ref, lset, hash)
return nil
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
return nil

@ -145,7 +145,7 @@ func TestScrapePoolReload(t *testing.T) {
}
// On starting to run, new loops created on reload check whether their preceding
// equivalents have been stopped.
newLoop := func(ctx context.Context, s scraper, app, reportApp func() storage.Appender, _ log.Logger) loop {
newLoop := func(_ *Target, s scraper) loop {
l := &testLoop{}
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
if interval != 3*time.Second {
@ -228,92 +228,48 @@ func TestScrapePoolReload(t *testing.T) {
}
}
func TestScrapePoolReportAppender(t *testing.T) {
cfg := &config.ScrapeConfig{
MetricRelabelConfigs: []*config.RelabelConfig{
{}, {}, {},
},
}
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app, log.Base())
cfg.HonorLabels = false
wrapped := sp.reportAppender(target)
rl, ok := wrapped.(ruleLabelsAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
if _, ok := rl.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", rl.Appender)
}
cfg.HonorLabels = true
wrapped = sp.reportAppender(target)
hl, ok := wrapped.(ruleLabelsAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
if _, ok := rl.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", hl.Appender)
}
}
func TestScrapePoolSampleAppender(t *testing.T) {
cfg := &config.ScrapeConfig{
MetricRelabelConfigs: []*config.RelabelConfig{
{}, {}, {},
},
}
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
func TestScrapePoolAppender(t *testing.T) {
cfg := &config.ScrapeConfig{}
app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app, log.Base())
sp.maxAheadTime = 0
cfg.HonorLabels = false
wrapped := sp.sampleAppender(target)
wrapped := sp.appender()
rl, ok := wrapped.(ruleLabelsAppender)
if !ok {
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
}
re, ok := rl.Appender.(relabelAppender)
tl, ok := wrapped.(*timeLimitAppender)
if !ok {
t.Fatalf("Expected relabelAppender but got %T", rl.Appender)
t.Fatalf("Expected timeLimitAppender but got %T", wrapped)
}
if _, ok := re.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", re.Appender)
if _, ok := tl.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", tl.Appender)
}
cfg.HonorLabels = true
cfg.SampleLimit = 100
wrapped = sp.sampleAppender(target)
hl, ok := wrapped.(honorLabelsAppender)
if !ok {
t.Fatalf("Expected honorLabelsAppender but got %T", wrapped)
}
re, ok = hl.Appender.(relabelAppender)
wrapped = sp.appender()
sl, ok := wrapped.(*limitAppender)
if !ok {
t.Fatalf("Expected relabelAppender but got %T", hl.Appender)
t.Fatalf("Expected limitAppender but got %T", wrapped)
}
lm, ok := re.Appender.(*limitAppender)
tl, ok = sl.Appender.(*timeLimitAppender)
if !ok {
t.Fatalf("Expected limitAppender but got %T", lm.Appender)
t.Fatalf("Expected limitAppender but got %T", sl.Appender)
}
if _, ok := lm.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", re.Appender)
if _, ok := tl.Appender.(nopAppender); !ok {
t.Fatalf("Expected base appender but got %T", tl.Appender)
}
}
func TestScrapeLoopStopBeforeRun(t *testing.T) {
scraper := &testScraper{}
sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil, nil)
sl := newScrapeLoop(context.Background(),
scraper,
nil, nil,
nopMutator,
nopMutator,
nil,
)
// The scrape pool synchronizes on stopping scrape loops. However, new scrape
// loops are started asynchronously. Thus it's possible, that a loop is stopped
@ -358,22 +314,28 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) {
}
}
func nopMutator(l labels.Labels) labels.Labels { return l }
func TestScrapeLoopStop(t *testing.T) {
appender := &collectResultAppender{}
reportAppender := &collectResultAppender{}
var (
signal = make(chan struct{})
scraper = &testScraper{}
app = func() storage.Appender { return appender }
reportApp = func() storage.Appender { return reportAppender }
numScrapes = 0
signal = make(chan struct{})
appender = &collectResultAppender{}
scraper = &testScraper{}
app = func() storage.Appender { return appender }
)
defer close(signal)
sl := newScrapeLoop(context.Background(), scraper, app, reportApp, nil, nil)
sl := newScrapeLoop(context.Background(),
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
// Terminate loop after 2 scrapes.
numScrapes := 0
// Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes++
if numScrapes == 2 {
@ -394,25 +356,25 @@ func TestScrapeLoopStop(t *testing.T) {
t.Fatalf("Scrape wasn't stopped.")
}
if len(appender.result) < 2 {
t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 2, len(appender.result))
// We expected 1 actual sample for each scrape plus 4 for report samples.
// At least 2 scrapes were made, plus the final stale markers.
if len(appender.result) < 5*3 || len(appender.result)%5 != 0 {
t.Fatalf("Expected at least 3 scrapes with 4 samples each, got %d samples", len(appender.result))
}
if !value.IsStaleNaN(appender.result[len(appender.result)-1].v) {
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)-1].v))
}
if len(reportAppender.result) < 8 {
t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 8, len(reportAppender.result))
}
if len(reportAppender.result)%4 != 0 {
t.Fatalf("Appended samples not as expected. Wanted: samples mod 4 == 0 Got: %d samples", len(reportAppender.result))
}
if !value.IsStaleNaN(reportAppender.result[len(reportAppender.result)-1].v) {
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(reportAppender.result[len(reportAppender.result)].v))
// All samples in a scrape must have the same timestmap.
var ts int64
for i, s := range appender.result {
if i%5 == 0 {
ts = s.t
} else if s.t != ts {
t.Fatalf("Unexpected multiple timestamps within single scrape")
}
}
if reportAppender.result[len(reportAppender.result)-1].t != appender.result[len(appender.result)-1].t {
t.Fatalf("Expected last append and report sample to have same timestamp. Append: stale NaN Report: %x", appender.result[len(appender.result)-1].t, reportAppender.result[len(reportAppender.result)-1].t)
// All samples from the last scrape must be stale markers.
for _, s := range appender.result[len(appender.result)-5:] {
if !value.IsStaleNaN(s.v) {
t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(s.v))
}
}
}
@ -421,14 +383,19 @@ func TestScrapeLoopRun(t *testing.T) {
signal = make(chan struct{})
errc = make(chan error)
scraper = &testScraper{}
app = func() storage.Appender { return &nopAppender{} }
reportApp = func() storage.Appender { return &nopAppender{} }
scraper = &testScraper{}
app = func() storage.Appender { return &nopAppender{} }
)
defer close(signal)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil)
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
// The loop must terminate during the initial offset if the context
// is canceled.
@ -466,7 +433,13 @@ func TestScrapeLoopRun(t *testing.T) {
}
ctx, cancel = context.WithCancel(context.Background())
sl = newScrapeLoop(ctx, scraper, app, reportApp, nil, nil)
sl = newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
go func() {
sl.run(time.Second, 100*time.Millisecond, errc)
@ -501,19 +474,23 @@ func TestScrapeLoopRun(t *testing.T) {
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
appender := &collectResultAppender{}
var (
signal = make(chan struct{})
scraper = &testScraper{}
app = func() storage.Appender { return appender }
reportApp = func() storage.Appender { return &nopAppender{} }
numScrapes = 0
signal = make(chan struct{})
scraper = &testScraper{}
app = func() storage.Appender { return appender }
)
defer close(signal)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil)
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
// Succeed once, several failures, then stop.
numScrapes := 0
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes++
@ -537,31 +514,37 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
t.Fatalf("Scrape wasn't stopped.")
}
if len(appender.result) != 2 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 2, len(appender.result))
// 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for
// each scrape successful or not.
if len(appender.result) != 22 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result))
}
if appender.result[0].v != 42.0 {
t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0], 42)
}
if !value.IsStaleNaN(appender.result[1].v) {
t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[1].v))
if !value.IsStaleNaN(appender.result[5].v) {
t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[5].v))
}
}
func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
appender := &collectResultAppender{}
var (
signal = make(chan struct{})
signal = make(chan struct{})
scraper = &testScraper{}
app = func() storage.Appender { return appender }
reportApp = func() storage.Appender { return &nopAppender{} }
numScrapes = 0
)
defer close(signal)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp, nil, nil)
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
// Succeed once, several failures, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -590,26 +573,29 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
t.Fatalf("Scrape wasn't stopped.")
}
if len(appender.result) != 2 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 2, len(appender.result))
// 1 successfully scraped sample, 1 stale marker after first fail, 4 report samples for
// each scrape successful or not.
if len(appender.result) != 14 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 22, len(appender.result))
}
if appender.result[0].v != 42.0 {
t.Fatalf("Appended first sample not as expected. Wanted: %f Got: %f", appender.result[0], 42)
}
if !value.IsStaleNaN(appender.result[1].v) {
t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[1].v))
if !value.IsStaleNaN(appender.result[5].v) {
t.Fatalf("Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[5].v))
}
}
func TestScrapeLoopAppend(t *testing.T) {
app := &collectResultAppender{}
sl := newScrapeLoop(context.Background(), nil,
sl := newScrapeLoop(context.Background(),
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
nil,
)
now := time.Now()
_, _, err := sl.append([]byte("metric_a 1\nmetric_b NaN\n"), now)
if err != nil {
@ -642,11 +628,12 @@ func TestScrapeLoopAppend(t *testing.T) {
func TestScrapeLoopAppendStaleness(t *testing.T) {
app := &collectResultAppender{}
sl := newScrapeLoop(context.Background(), nil,
sl := newScrapeLoop(context.Background(),
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
nil,
)
now := time.Now()
@ -686,11 +673,11 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
app := &collectResultAppender{}
sl := newScrapeLoop(context.Background(), nil,
sl := newScrapeLoop(context.Background(),
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
nil,
)
now := time.Now()
@ -713,128 +700,23 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
if !reflect.DeepEqual(want, app.result) {
t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, app.result)
}
}
func TestScrapeLoopRunAppliesScrapeLimit(t *testing.T) {
cases := []struct {
appender func() storage.Appender
up float64
scrapeSamplesScraped float64
scrapeSamplesScrapedPostMetricRelabelling float64
}{
{
appender: func() storage.Appender { return nopAppender{} },
up: 1,
scrapeSamplesScraped: 3,
scrapeSamplesScrapedPostMetricRelabelling: 3,
},
{
appender: func() storage.Appender {
return &limitAppender{Appender: nopAppender{}, limit: 3}
},
up: 1,
scrapeSamplesScraped: 3,
scrapeSamplesScrapedPostMetricRelabelling: 3,
},
{
appender: func() storage.Appender {
return &limitAppender{Appender: nopAppender{}, limit: 2}
},
up: 0,
scrapeSamplesScraped: 3,
scrapeSamplesScrapedPostMetricRelabelling: 3,
},
{
appender: func() storage.Appender {
return &relabelAppender{
Appender: &limitAppender{Appender: nopAppender{}, limit: 2},
relabelings: []*config.RelabelConfig{
&config.RelabelConfig{
SourceLabels: model.LabelNames{"__name__"},
Regex: config.MustNewRegexp("a"),
Action: config.RelabelDrop,
},
},
}
},
up: 1,
scrapeSamplesScraped: 3,
scrapeSamplesScrapedPostMetricRelabelling: 2,
},
}
for i, c := range cases {
reportAppender := &collectResultAppender{}
var (
signal = make(chan struct{})
scraper = &testScraper{}
numScrapes = 0
reportApp = func() storage.Appender {
// Get result of the 2nd scrape.
if numScrapes == 2 {
return reportAppender
} else {
return nopAppender{}
}
}
)
defer close(signal)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, c.appender, reportApp, nil, nil)
// Setup a series to be stale, then 3 samples, then stop.
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes += 1
if numScrapes == 1 {
w.Write([]byte("stale 0\n"))
return nil
} else if numScrapes == 2 {
w.Write([]byte("a 0\nb 0\nc 0 \n"))
return nil
} else if numScrapes == 3 {
cancel()
}
return fmt.Errorf("Scrape failed.")
}
go func() {
sl.run(10*time.Millisecond, time.Hour, nil)
signal <- struct{}{}
}()
select {
case <-signal:
case <-time.After(5 * time.Second):
t.Fatalf("Scrape wasn't stopped.")
}
if len(reportAppender.result) != 4 {
t.Fatalf("Case %d appended report samples not as expected. Wanted: %d samples Got: %d", i, 4, len(reportAppender.result))
}
if reportAppender.result[0].v != c.up {
t.Fatalf("Case %d appended up sample not as expected. Wanted: %f Got: %+v", i, c.up, reportAppender.result[0])
}
if reportAppender.result[2].v != c.scrapeSamplesScraped {
t.Fatalf("Case %d appended scrape_samples_scraped sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScraped, reportAppender.result[2])
}
if reportAppender.result[3].v != c.scrapeSamplesScrapedPostMetricRelabelling {
t.Fatalf("Case %d appended scrape_samples_scraped_post_metric_relabeling sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScrapedPostMetricRelabelling, reportAppender.result[3])
}
}
}
func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
var (
scraper = &testScraper{}
reportAppender = &collectResultAppender{}
reportApp = func() storage.Appender { return reportAppender }
scraper = &testScraper{}
appender = &collectResultAppender{}
app = func() storage.Appender { return appender }
)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil, nil)
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
cancel()
@ -843,31 +725,37 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
sl.run(10*time.Millisecond, time.Hour, nil)
if reportAppender.result[0].v != 0 {
t.Fatalf("bad 'up' value; want 0, got %v", reportAppender.result[0].v)
if appender.result[0].v != 0 {
t.Fatalf("bad 'up' value; want 0, got %v", appender.result[0].v)
}
}
func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
var (
scraper = &testScraper{}
reportAppender = &collectResultAppender{}
reportApp = func() storage.Appender { return reportAppender }
scraper = &testScraper{}
appender = &collectResultAppender{}
app = func() storage.Appender { return appender }
)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, func() storage.Appender { return nopAppender{} }, reportApp, nil, nil)
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
)
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
cancel()
w.Write([]byte("a{l=\"\xff\"} 0\n"))
w.Write([]byte("a{l=\"\xff\"} 1\n"))
return nil
}
sl.run(10*time.Millisecond, time.Hour, nil)
if reportAppender.result[0].v != 0 {
t.Fatalf("bad 'up' value; want 0, got %v", reportAppender.result[0].v)
if appender.result[0].v != 0 {
t.Fatalf("bad 'up' value; want 0, got %v", appender.result[0].v)
}
}
@ -894,11 +782,13 @@ func (app *errorAppender) AddFast(lset labels.Labels, ref uint64, t int64, v flo
func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) {
app := &errorAppender{}
sl := newScrapeLoop(context.Background(), nil,
func() storage.Appender { return app },
func() storage.Appender { return nopAppender{} },
nil,
sl := newScrapeLoop(context.Background(),
nil,
nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
)
now := time.Unix(1, 0)
@ -920,16 +810,17 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
app := &collectResultAppender{}
sl := newScrapeLoop(context.Background(), nil,
sl := newScrapeLoop(context.Background(),
nil,
nil, nil,
nopMutator,
nopMutator,
func() storage.Appender {
return &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(10 * time.Minute)),
}
},
func() storage.Appender { return nopAppender{} },
nil,
nil,
)
now := time.Now().Add(20 * time.Minute)

@ -253,63 +253,6 @@ func (app *timeLimitAppender) AddFast(lset labels.Labels, ref uint64, t int64, v
return nil
}
// Merges the ingested sample's metric with the label set. On a collision the
// value of the ingested label is stored in a label prefixed with 'exported_'.
type ruleLabelsAppender struct {
storage.Appender
labels labels.Labels
}
func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
lb := labels.NewBuilder(lset)
for _, l := range app.labels {
lv := lset.Get(l.Name)
if lv != "" {
lb.Set(model.ExportedLabelPrefix+l.Name, lv)
}
lb.Set(l.Name, l.Value)
}
return app.Appender.Add(lb.Labels(), t, v)
}
type honorLabelsAppender struct {
storage.Appender
labels labels.Labels
}
// Merges the sample's metric with the given labels if the label is not
// already present in the metric.
// This also considers labels explicitly set to the empty string.
func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
lb := labels.NewBuilder(lset)
for _, l := range app.labels {
if lv := lset.Get(l.Name); lv == "" {
lb.Set(l.Name, l.Value)
}
}
return app.Appender.Add(lb.Labels(), t, v)
}
// Applies a set of relabel configurations to the sample's metric
// before actually appending it.
type relabelAppender struct {
storage.Appender
relabelings []*config.RelabelConfig
}
var errSeriesDropped = errors.New("series dropped")
func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
lset = relabel.Process(lset, app.relabelings...)
if lset == nil {
return 0, errSeriesDropped
}
return app.Appender.Add(lset, t, v)
}
// populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns a nil label set if the target is dropped during relabeling.

Loading…
Cancel
Save