You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
prometheus/retrieval/targetpool.go

158 lines
3.8 KiB

// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"github.com/prometheus/prometheus/retrieval/format"
"log"
"sort"
"time"
)
const (
intervalKey = "interval"
)
type TargetPool struct {
done chan bool
manager TargetManager
targets []Target
addTargetQueue chan Target
replaceTargetsQueue chan []Target
}
func NewTargetPool(m TargetManager) *TargetPool {
return &TargetPool{
manager: m,
addTargetQueue: make(chan Target),
replaceTargetsQueue: make(chan []Target),
}
}
func (p TargetPool) Len() int {
return len(p.targets)
}
func (p TargetPool) Less(i, j int) bool {
return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor())
}
func (p TargetPool) Swap(i, j int) {
p.targets[i], p.targets[j] = p.targets[j], p.targets[i]
}
func (p *TargetPool) Run(results chan format.Result, interval time.Duration) {
ticker := time.Tick(interval)
for {
select {
case <-ticker:
p.runIteration(results, interval)
case newTarget := <-p.addTargetQueue:
p.addTarget(newTarget)
case newTargets := <-p.replaceTargetsQueue:
p.replaceTargets(newTargets)
case <-p.done:
log.Printf("TargetPool exiting...")
break
}
}
}
func (p TargetPool) Stop() {
p.done <- true
}
func (p *TargetPool) AddTarget(target Target) {
p.addTargetQueue <- target
}
func (p *TargetPool) addTarget(target Target) {
p.targets = append(p.targets, target)
}
func (p *TargetPool) ReplaceTargets(newTargets []Target) {
p.replaceTargetsQueue <- newTargets
}
func (p *TargetPool) replaceTargets(newTargets []Target) {
// Replace old target list by new one, but reuse those targets from the old
// list of targets which are also in the new list (to preserve scheduling and
// health state).
for j, newTarget := range newTargets {
for _, oldTarget := range p.targets {
if oldTarget.Address() == newTarget.Address() {
oldTarget.Merge(newTargets[j])
newTargets[j] = oldTarget
}
}
}
p.targets = newTargets
}
func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t Target) {
p.manager.acquire()
defer p.manager.release()
t.Scrape(earliest, results)
}
func (p *TargetPool) runIteration(results chan format.Result, interval time.Duration) {
begin := time.Now()
targetCount := p.Len()
finished := make(chan bool, targetCount)
// Sort p.targets by next scheduling time so we can process the earliest
// targets first.
sort.Sort(p)
for _, target := range p.targets {
now := time.Now()
if target.scheduledFor().After(now) {
// None of the remaining targets are ready to be scheduled. Signal that
// we're done processing them in this scrape iteration.
finished <- true
continue
}
go func(t Target) {
p.runSingle(now, results, t)
finished <- true
}(target)
}
for i := 0; i < targetCount; {
select {
case <-finished:
i++
case newTarget := <-p.addTargetQueue:
p.addTarget(newTarget)
case newTargets := <-p.replaceTargetsQueue:
p.replaceTargets(newTargets)
}
}
close(finished)
duration := float64(time.Since(begin) / time.Millisecond)
retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration)
}
// BUG(all): Not really thread-safe. Only used in /status page for now.
func (p *TargetPool) Targets() []Target {
return p.targets
}