Merge pull request #1393 from prometheus/scraperef

Make scraping offset consistent.
pull/1395/head
Fabian Reinartz 2016-02-15 16:52:03 +01:00
commit cb86a4300b
2 changed files with 72 additions and 5 deletions

View File

@ -18,7 +18,6 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand"
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
@ -267,6 +266,30 @@ func (t *Target) String() string {
return t.host() return t.host()
} }
// fingerprint returns an identifying hash for the target.
func (t *Target) fingerprint() model.Fingerprint {
t.RLock()
defer t.RUnlock()
return t.labels.Fingerprint()
}
// offset returns the time until the next scrape cycle for the target.
func (t *Target) offset(interval time.Duration) time.Duration {
now := time.Now().UnixNano()
var (
base = now % int64(interval)
offset = uint64(t.fingerprint()) % uint64(interval)
next = base + int64(offset)
)
if next > int64(interval) {
next -= int64(interval)
}
return time.Duration(next)
}
func (t *Target) client() (*http.Client, error) { func (t *Target) client() (*http.Client, error) {
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
@ -366,14 +389,12 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) {
log.Debugf("Starting scraper for target %v...", t) log.Debugf("Starting scraper for target %v...", t)
jitterTimer := time.NewTimer(time.Duration(float64(lastScrapeInterval) * rand.Float64()))
select { select {
case <-jitterTimer.C: case <-time.After(t.offset(lastScrapeInterval)):
// Continue after scraping offset.
case <-t.scraperStopping: case <-t.scraperStopping:
jitterTimer.Stop()
return return
} }
jitterTimer.Stop()
ticker := time.NewTicker(lastScrapeInterval) ticker := time.NewTicker(lastScrapeInterval)
defer ticker.Stop() defer ticker.Stop()

View File

@ -45,6 +45,52 @@ func TestTargetLabels(t *testing.T) {
} }
} }
func TestTargetOffset(t *testing.T) {
interval := 10 * time.Second
offsets := make([]time.Duration, 10000)
// Calculate offsets for 10000 different targets.
for i := range offsets {
target := newTestTarget("example.com:80", 0, model.LabelSet{
"label": model.LabelValue(fmt.Sprintf("%d", i)),
})
offsets[i] = target.offset(interval)
}
// Put the offsets into buckets and validate that they are all
// within bounds.
bucketSize := 1 * time.Second
buckets := make([]int, interval/bucketSize)
for _, offset := range offsets {
if offset < 0 || offset >= interval {
t.Fatalf("Offset %v out of bounds", offset)
}
bucket := offset / bucketSize
buckets[bucket]++
}
t.Log(buckets)
// Calculate whether the the number of targets per bucket
// does not differ more than a given tolerance.
avg := len(offsets) / len(buckets)
tolerance := 0.15
for _, bucket := range buckets {
diff := bucket - avg
if diff < 0 {
diff = -diff
}
if float64(diff)/float64(avg) > tolerance {
t.Fatalf("Bucket out of tolerance bounds")
}
}
}
func TestOverwriteLabels(t *testing.T) { func TestOverwriteLabels(t *testing.T) {
type test struct { type test struct {
metric string metric string