diff --git a/retrieval/target.go b/retrieval/target.go index 7c470e0bc..a706c9cf6 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -18,7 +18,6 @@ import ( "fmt" "io" "io/ioutil" - "math/rand" "net/http" "net/url" "strings" @@ -267,6 +266,30 @@ func (t *Target) String() string { return t.host() } +// fingerprint returns an identifying hash for the target. +func (t *Target) fingerprint() model.Fingerprint { + t.RLock() + defer t.RUnlock() + + return t.labels.Fingerprint() +} + +// offset returns the time until the next scrape cycle for the target. +func (t *Target) offset(interval time.Duration) time.Duration { + now := time.Now().UnixNano() + + var ( + base = now % int64(interval) + offset = uint64(t.fingerprint()) % uint64(interval) + next = base + int64(offset) + ) + + if next > int64(interval) { + next -= int64(interval) + } + return time.Duration(next) +} + func (t *Target) client() (*http.Client, error) { t.RLock() defer t.RUnlock() @@ -366,14 +389,12 @@ func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { log.Debugf("Starting scraper for target %v...", t) - jitterTimer := time.NewTimer(time.Duration(float64(lastScrapeInterval) * rand.Float64())) select { - case <-jitterTimer.C: + case <-time.After(t.offset(lastScrapeInterval)): + // Continue after scraping offset. case <-t.scraperStopping: - jitterTimer.Stop() return } - jitterTimer.Stop() ticker := time.NewTicker(lastScrapeInterval) defer ticker.Stop() diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 3a0ee8857..4cef0d4ef 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -45,6 +45,52 @@ func TestTargetLabels(t *testing.T) { } } +func TestTargetOffset(t *testing.T) { + interval := 10 * time.Second + + offsets := make([]time.Duration, 10000) + + // Calculate offsets for 10000 different targets. + for i := range offsets { + target := newTestTarget("example.com:80", 0, model.LabelSet{ + "label": model.LabelValue(fmt.Sprintf("%d", i)), + }) + offsets[i] = target.offset(interval) + } + + // Put the offsets into buckets and validate that they are all + // within bounds. + bucketSize := 1 * time.Second + buckets := make([]int, interval/bucketSize) + + for _, offset := range offsets { + if offset < 0 || offset >= interval { + t.Fatalf("Offset %v out of bounds", offset) + } + + bucket := offset / bucketSize + buckets[bucket]++ + } + + t.Log(buckets) + + // Calculate whether the the number of targets per bucket + // does not differ more than a given tolerance. + avg := len(offsets) / len(buckets) + tolerance := 0.15 + + for _, bucket := range buckets { + diff := bucket - avg + if diff < 0 { + diff = -diff + } + + if float64(diff)/float64(avg) > tolerance { + t.Fatalf("Bucket out of tolerance bounds") + } + } +} + func TestOverwriteLabels(t *testing.T) { type test struct { metric string