diff --git a/main.go b/main.go index 3340f496f..01cb8bfb6 100644 --- a/main.go +++ b/main.go @@ -67,8 +67,6 @@ var ( notificationQueueCapacity = flag.Int("alertmanager.notificationQueueCapacity", 100, "The size of the queue for pending alert manager notifications.") - concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.") - printVersion = flag.Bool("version", false, "print version information") shutdownTimeout = flag.Duration("shutdownGracePeriod", 0*time.Second, "The amount of time Prometheus gives background services to finish running when shutdown is requested.") @@ -269,7 +267,7 @@ func main() { deletionTimer := time.NewTicker(*deleteInterval) // Queue depth will need to be exposed - targetManager := retrieval.NewTargetManager(ingester, *concurrentRetrievalAllowance) + targetManager := retrieval.NewTargetManager(ingester) targetManager.AddTargetsFromConfig(conf) notifications := make(chan notification.NotificationReqs, *notificationQueueCapacity) diff --git a/retrieval/target.go b/retrieval/target.go index 976f8f871..f0045963d 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -15,6 +15,7 @@ package retrieval import ( "fmt" + "math/rand" "net/http" "os" "strings" @@ -41,6 +42,7 @@ const ( failure = "failure" outcome = "outcome" success = "success" + interval = "interval" ) var ( @@ -55,10 +57,20 @@ var ( }, []string{job, instance, outcome}, ) + targetIntervalLength = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "target_interval_length_seconds", + Help: "Actual intervals between scrapes.", + Objectives: []float64{0.01, 0.05, 0.5, 0.90, 0.99}, + }, + []string{interval}, + ) ) func init() { prometheus.MustRegister(targetOperationLatencies) + prometheus.MustRegister(targetIntervalLength) } // The state of the given Target. @@ -99,8 +111,6 @@ const ( // metrics are retrieved and deserialized from the given instance to which it // refers. type Target interface { - // Retrieve values from this target. - Scrape(ingester extraction.Ingester) error // Return the last encountered scrape error, if any. LastError() error // Return the health of the target. @@ -120,6 +130,12 @@ type Target interface { // labels) into an old target definition for the same endpoint. Preserve // remaining information - like health state - from the old target. Merge(newTarget Target) + // Scrape target at the specified interval. + RunScraper(extraction.Ingester, time.Duration) + // Stop scraping, synchronous. + StopScraper() + // Do a single scrape. + scrape(ingester extraction.Ingester) error } // target is a Target that refers to a singular HTTP or HTTPS endpoint. @@ -130,6 +146,9 @@ type target struct { lastError error // The last time a scrape was attempted. lastScrape time.Time + // Channel to signal RunScraper should stop, holds a channel + // to notify once stopped. + stopScraper chan bool address string // What is the deadline for the HTTP or HTTPS against this endpoint. @@ -143,10 +162,11 @@ type target struct { // Furnish a reasonably configured target for querying. func NewTarget(address string, deadline time.Duration, baseLabels clientmodel.LabelSet) Target { target := &target{ - address: address, - Deadline: deadline, - baseLabels: baseLabels, - httpClient: utility.NewDeadlineClient(deadline), + address: address, + Deadline: deadline, + baseLabels: baseLabels, + httpClient: utility.NewDeadlineClient(deadline), + stopScraper: make(chan bool), } return target @@ -177,24 +197,40 @@ func (t *target) recordScrapeHealth(ingester extraction.Ingester, timestamp clie }) } -func (t *target) Scrape(ingester extraction.Ingester) error { - now := clientmodel.Now() - err := t.scrape(now, ingester) - if err == nil { - t.state = ALIVE - t.recordScrapeHealth(ingester, now, true) - } else { - t.state = UNREACHABLE - t.recordScrapeHealth(ingester, now, false) +func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration) { + jitterTimer := time.NewTimer(time.Duration(float64(interval) * rand.Float64())) + select { + case <-jitterTimer.C: + case <-t.stopScraper: + return } + jitterTimer.Stop() + + ticker := time.NewTicker(interval) + defer ticker.Stop() + t.lastScrape = time.Now() - t.lastError = err - return err + t.scrape(ingester) + for { + select { + case <-ticker.C: + targetIntervalLength.WithLabelValues(interval.String()).Observe(float64(time.Since(t.lastScrape) / time.Second)) + t.lastScrape = time.Now() + t.scrape(ingester) + case <-t.stopScraper: + return + } + } +} + +func (t *target) StopScraper() { + t.stopScraper <- true } const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema=prometheus/telemetry;version=0.0.2;q=0.2,*/*;q=0.1` -func (t *target) scrape(timestamp clientmodel.Timestamp, ingester extraction.Ingester) (err error) { +func (t *target) scrape(ingester extraction.Ingester) (err error) { + timestamp := clientmodel.Now() defer func(start time.Time) { ms := float64(time.Since(start)) / float64(time.Millisecond) labels := prometheus.Labels{ @@ -202,11 +238,16 @@ func (t *target) scrape(timestamp clientmodel.Timestamp, ingester extraction.Ing instance: t.Address(), outcome: success, } - if err != nil { + if err == nil { + t.state = ALIVE + t.recordScrapeHealth(ingester, timestamp, true) labels[outcome] = failure + } else { + t.state = UNREACHABLE + t.recordScrapeHealth(ingester, timestamp, false) } - targetOperationLatencies.With(labels).Observe(ms) + t.lastError = err }(time.Now()) req, err := http.NewRequest("GET", t.Address(), nil) @@ -292,7 +333,3 @@ func (t *target) Merge(newTarget Target) { } type targets []Target - -func (t targets) Len() int { - return len(t) -} diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 54a77853c..93238489f 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -42,7 +42,7 @@ func TestTargetScrapeUpdatesState(t *testing.T) { address: "bad schema", httpClient: utility.NewDeadlineClient(0), } - testTarget.Scrape(nopIngester{}) + testTarget.scrape(nopIngester{}) if testTarget.state != UNREACHABLE { t.Errorf("Expected target state %v, actual: %v", UNREACHABLE, testTarget.state) } @@ -100,7 +100,7 @@ func TestTargetScrapeTimeout(t *testing.T) { // scrape once without timeout signal <- true - if err := testTarget.Scrape(ingester); err != nil { + if err := testTarget.scrape(ingester); err != nil { t.Fatal(err) } @@ -109,12 +109,12 @@ func TestTargetScrapeTimeout(t *testing.T) { // now scrape again signal <- true - if err := testTarget.Scrape(ingester); err != nil { + if err := testTarget.scrape(ingester); err != nil { t.Fatal(err) } // now timeout - if err := testTarget.Scrape(ingester); err == nil { + if err := testTarget.scrape(ingester); err == nil { t.Fatal("expected scrape to timeout") } else { signal <- true // let handler continue @@ -122,7 +122,7 @@ func TestTargetScrapeTimeout(t *testing.T) { // now scrape again without timeout signal <- true - if err := testTarget.Scrape(ingester); err != nil { + if err := testTarget.scrape(ingester); err != nil { t.Fatal(err) } } @@ -138,8 +138,34 @@ func TestTargetScrape404(t *testing.T) { ingester := nopIngester{} want := errors.New("server returned HTTP status 404 Not Found") - got := testTarget.Scrape(ingester) + got := testTarget.scrape(ingester) if got == nil || want.Error() != got.Error() { t.Fatalf("want err %q, got %q", want, got) } } + +func TestTargetRunScraperScrapes(t *testing.T) { + testTarget := target{ + state: UNKNOWN, + address: "bad schema", + httpClient: utility.NewDeadlineClient(0), + stopScraper: make(chan bool, 1), + } + go testTarget.RunScraper(nopIngester{}, time.Duration(time.Millisecond)) + + // Enough time for a scrape to happen. + time.Sleep(2 * time.Millisecond) + if testTarget.lastScrape.IsZero() { + t.Errorf("Scrape hasn't occured.") + } + + testTarget.StopScraper() + // Wait for it to take effect. + time.Sleep(2 * time.Millisecond) + last := testTarget.lastScrape + // Enough time for a scrape to happen. + time.Sleep(2 * time.Millisecond) + if testTarget.lastScrape != last { + t.Errorf("Scrape occured after it was stopped.") + } +} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index f5ca82f63..eda6bef81 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -23,8 +23,6 @@ import ( ) type TargetManager interface { - acquire() - release() AddTarget(job config.JobConfig, t Target) ReplaceTargets(job config.JobConfig, newTargets []Target) Remove(t Target) @@ -34,27 +32,17 @@ type TargetManager interface { } type targetManager struct { - requestAllowance chan bool - poolsByJob map[string]*TargetPool - ingester extraction.Ingester + poolsByJob map[string]*TargetPool + ingester extraction.Ingester } -func NewTargetManager(ingester extraction.Ingester, requestAllowance int) TargetManager { +func NewTargetManager(ingester extraction.Ingester) TargetManager { return &targetManager{ - requestAllowance: make(chan bool, requestAllowance), - ingester: ingester, - poolsByJob: make(map[string]*TargetPool), + ingester: ingester, + poolsByJob: make(map[string]*TargetPool), } } -func (m *targetManager) acquire() { - m.requestAllowance <- true -} - -func (m *targetManager) release() { - <-m.requestAllowance -} - func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool { targetPool, ok := m.poolsByJob[job.GetName()] @@ -64,13 +52,13 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool { provider = NewSdTargetProvider(job) } - targetPool = NewTargetPool(m, provider) + interval := job.ScrapeInterval() + targetPool = NewTargetPool(m, provider, m.ingester, interval) glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName()) - interval := job.ScrapeInterval() m.poolsByJob[job.GetName()] = targetPool // BUG(all): Investigate whether this auto-goroutine creation is desired. - go targetPool.Run(m.ingester, interval) + go targetPool.Run() } return targetPool @@ -84,7 +72,7 @@ func (m *targetManager) AddTarget(job config.JobConfig, t Target) { func (m *targetManager) ReplaceTargets(job config.JobConfig, newTargets []Target) { targetPool := m.TargetPoolForJob(job) - targetPool.replaceTargets(newTargets) + targetPool.ReplaceTargets(newTargets) } func (m targetManager) Remove(t Target) { diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 17dae352b..885e6a69f 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -29,10 +29,9 @@ import ( ) type fakeTarget struct { - scrapeCount int - schedules []time.Time - interval time.Duration - scheduleIndex int + scrapeCount int + lastScrape time.Time + interval time.Duration } func (t fakeTarget) LastError() error { @@ -55,33 +54,32 @@ func (t fakeTarget) Interval() time.Duration { return t.interval } -func (t *fakeTarget) Scrape(i extraction.Ingester) error { +func (t fakeTarget) LastScrape() time.Time { + return t.lastScrape +} + +func (t fakeTarget) scrape(i extraction.Ingester) error { t.scrapeCount++ return nil } +func (t fakeTarget) RunScraper(ingester extraction.Ingester, interval time.Duration) { + return +} + +func (t fakeTarget) StopScraper() { + return +} + func (t fakeTarget) State() TargetState { return ALIVE } -func (t fakeTarget) LastScrape() time.Time { - return time.Now() -} - -func (t *fakeTarget) ScheduledFor() (time time.Time) { - time = t.schedules[t.scheduleIndex] - t.scheduleIndex++ - - return -} - func (t *fakeTarget) Merge(newTarget Target) {} -func (t *fakeTarget) EstimatedTimeToExecute() time.Duration { return 0 } - func testTargetManager(t testing.TB) { - targetManager := NewTargetManager(nopIngester{}, 3) + targetManager := NewTargetManager(nopIngester{}) testJob1 := config.JobConfig{ JobConfig: pb.JobConfig{ Name: proto.String("test_job1"), @@ -96,20 +94,17 @@ func testTargetManager(t testing.TB) { } target1GroupA := &fakeTarget{ - schedules: []time.Time{time.Now()}, - interval: time.Minute, + interval: time.Minute, } target2GroupA := &fakeTarget{ - schedules: []time.Time{time.Now()}, - interval: time.Minute, + interval: time.Minute, } targetManager.AddTarget(testJob1, target1GroupA) targetManager.AddTarget(testJob1, target2GroupA) target1GroupB := &fakeTarget{ - schedules: []time.Time{time.Now()}, - interval: time.Minute * 2, + interval: time.Minute * 2, } targetManager.AddTarget(testJob2, target1GroupB) diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index bce49be4c..89cc4d0ae 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -19,75 +19,69 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/extraction" - "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/utility" ) const ( targetAddQueueSize = 100 targetReplaceQueueSize = 1 - - intervalKey = "interval" ) -var ( - retrievalDurations = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: namespace, - Name: "targetpool_retrieve_time_milliseconds", - Help: "The time needed for each TargetPool to retrieve state from all included entities.", - Objectives: []float64{0.01, 0.05, 0.5, 0.90, 0.99}, - }, - []string{intervalKey}, - ) -) - -func init() { - prometheus.MustRegister(retrievalDurations) -} - type TargetPool struct { sync.RWMutex - done chan bool - manager TargetManager - targets targets - addTargetQueue chan Target - replaceTargetsQueue chan targets + done chan chan bool + manager TargetManager + targetsByAddress map[string]Target + interval time.Duration + ingester extraction.Ingester + addTargetQueue chan Target targetProvider TargetProvider } -func NewTargetPool(m TargetManager, p TargetProvider) *TargetPool { +func NewTargetPool(m TargetManager, p TargetProvider, ing extraction.Ingester, i time.Duration) *TargetPool { return &TargetPool{ - manager: m, - addTargetQueue: make(chan Target, targetAddQueueSize), - replaceTargetsQueue: make(chan targets, targetReplaceQueueSize), - targetProvider: p, - done: make(chan bool), + manager: m, + interval: i, + ingester: ing, + targetsByAddress: make(map[string]Target), + addTargetQueue: make(chan Target, targetAddQueueSize), + targetProvider: p, + done: make(chan chan bool), } } -func (p *TargetPool) Run(ingester extraction.Ingester, interval time.Duration) { - ticker := time.NewTicker(interval) +func (p *TargetPool) Run() { + ticker := time.NewTicker(p.interval) defer ticker.Stop() for { select { case <-ticker.C: - p.runIteration(ingester, interval) + if p.targetProvider != nil { + targets, err := p.targetProvider.Targets() + if err != nil { + glog.Warningf("Error looking up targets, keeping old list: %s", err) + } else { + p.ReplaceTargets(targets) + } + } case newTarget := <-p.addTargetQueue: p.addTarget(newTarget) - case newTargets := <-p.replaceTargetsQueue: - p.replaceTargets(newTargets) - case <-p.done: + case stopped := <-p.done: + p.ReplaceTargets([]Target{}) glog.Info("TargetPool exiting...") + stopped <- true return } } } func (p TargetPool) Stop() { - p.done <- true + stopped := make(chan bool) + p.done <- stopped + <-stopped } func (p *TargetPool) AddTarget(target Target) { @@ -98,85 +92,45 @@ func (p *TargetPool) addTarget(target Target) { p.Lock() defer p.Unlock() - p.targets = append(p.targets, target) + p.targetsByAddress[target.Address()] = target + go target.RunScraper(p.ingester, p.interval) } func (p *TargetPool) ReplaceTargets(newTargets []Target) { p.Lock() defer p.Unlock() - // If there is anything remaining in the queue for effectuation, clear it out, - // because the last mutation should win. - select { - case <-p.replaceTargetsQueue: - default: - p.replaceTargetsQueue <- newTargets - } -} - -func (p *TargetPool) replaceTargets(newTargets []Target) { - p.Lock() - defer p.Unlock() - // Replace old target list by new one, but reuse those targets from the old // list of targets which are also in the new list (to preserve scheduling and // health state). - for j, newTarget := range newTargets { - for _, oldTarget := range p.targets { - if oldTarget.Address() == newTarget.Address() { - oldTarget.Merge(newTargets[j]) - newTargets[j] = oldTarget - } - } - } - - p.targets = newTargets -} - -func (p *TargetPool) runSingle(ingester extraction.Ingester, t Target) { - p.manager.acquire() - defer p.manager.release() - - t.Scrape(ingester) -} - -func (p *TargetPool) runIteration(ingester extraction.Ingester, interval time.Duration) { - if p.targetProvider != nil { - targets, err := p.targetProvider.Targets() - if err != nil { - glog.Warningf("Error looking up targets, keeping old list: %s", err) + newTargetAddresses := make(utility.Set) + for _, newTarget := range newTargets { + newTargetAddresses.Add(newTarget.Address()) + oldTarget, ok := p.targetsByAddress[newTarget.Address()] + if ok { + oldTarget.Merge(newTarget) } else { - p.ReplaceTargets(targets) + p.targetsByAddress[newTarget.Address()] = newTarget + go newTarget.RunScraper(p.ingester, p.interval) } } - - p.RLock() - defer p.RUnlock() - - begin := time.Now() - wait := sync.WaitGroup{} - - for _, target := range p.targets { - wait.Add(1) - - go func(t Target) { - p.runSingle(ingester, t) - wait.Done() - }(target) + // Stop any targets no longer present. + for k, oldTarget := range p.targetsByAddress { + if !newTargetAddresses.Has(k) { + glog.V(1).Info("Stopping scraper for target ", k) + oldTarget.StopScraper() + delete(p.targetsByAddress, k) + } } - - wait.Wait() - - duration := float64(time.Since(begin) / time.Millisecond) - retrievalDurations.WithLabelValues(interval.String()).Observe(duration) } func (p *TargetPool) Targets() []Target { p.RLock() defer p.RUnlock() - targets := make([]Target, len(p.targets)) - copy(targets, p.targets) - + targets := make([]Target, 0, len(p.targetsByAddress)) + for _, v := range p.targetsByAddress { + targets = append(targets, v) + } return targets } diff --git a/retrieval/targetpool_test.go b/retrieval/targetpool_test.go index be6421ba0..1e8005f27 100644 --- a/retrieval/targetpool_test.go +++ b/retrieval/targetpool_test.go @@ -77,7 +77,7 @@ func testTargetPool(t testing.TB) { } for i, scenario := range scenarios { - pool := TargetPool{} + pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1)) for _, input := range scenario.inputs { target := target{ @@ -87,11 +87,11 @@ func testTargetPool(t testing.TB) { pool.addTarget(&target) } - if pool.targets.Len() != len(scenario.outputs) { - t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.targets.Len()) + if len(pool.targetsByAddress) != len(scenario.outputs) { + t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), len(pool.targetsByAddress)) } else { for j, output := range scenario.outputs { - target := pool.targets[j] + target := pool.Targets()[j] if target.Address() != output.address { t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address()) @@ -99,8 +99,8 @@ func testTargetPool(t testing.TB) { } } - if pool.targets.Len() != len(scenario.outputs) { - t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), pool.targets.Len()) + if len(pool.targetsByAddress) != len(scenario.outputs) { + t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), len(pool.targetsByAddress)) } } } @@ -111,41 +111,48 @@ func TestTargetPool(t *testing.T) { } func TestTargetPoolReplaceTargets(t *testing.T) { - pool := TargetPool{} + pool := NewTargetPool(nil, nil, nopIngester{}, time.Duration(1)) oldTarget1 := &target{ - address: "http://example1.com/metrics.json", - state: UNREACHABLE, + address: "example1", + state: UNREACHABLE, + stopScraper: make(chan bool, 1), } oldTarget2 := &target{ - address: "http://example2.com/metrics.json", - state: UNREACHABLE, + address: "example2", + state: UNREACHABLE, + stopScraper: make(chan bool, 1), } newTarget1 := &target{ - address: "http://example1.com/metrics.json", - state: ALIVE, + address: "example1", + state: ALIVE, + stopScraper: make(chan bool, 1), } newTarget2 := &target{ - address: "http://example3.com/metrics.json", - state: ALIVE, + address: "example3", + state: ALIVE, + stopScraper: make(chan bool, 1), } + oldTarget1.StopScraper() + oldTarget2.StopScraper() + newTarget2.StopScraper() + pool.addTarget(oldTarget1) pool.addTarget(oldTarget2) - pool.replaceTargets([]Target{newTarget1, newTarget2}) + pool.ReplaceTargets([]Target{newTarget1, newTarget2}) - if pool.targets.Len() != 2 { - t.Errorf("Expected 2 elements in pool, had %d", pool.targets.Len()) + if len(pool.targetsByAddress) != 2 { + t.Errorf("Expected 2 elements in pool, had %d", len(pool.targetsByAddress)) } - target1 := pool.targets[0].(*target) - if target1.state != oldTarget1.state { - t.Errorf("Wrong first target returned from pool, expected %v, got %v", oldTarget1, target1) + if pool.targetsByAddress["example1"].State() != oldTarget1.State() { + t.Errorf("target1 channel has changed") } - target2 := pool.targets[1].(*target) - if target2.state != newTarget2.state { - t.Errorf("Wrong second target returned from pool, expected %v, got %v", newTarget2, target2) + if pool.targetsByAddress["example3"].State() == oldTarget2.State() { + t.Errorf("newTarget2 channel same as oldTarget2's") } + } func BenchmarkTargetPool(b *testing.B) {