|
|
|
@ -14,50 +14,46 @@
|
|
|
|
|
package retrieval |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"github.com/prometheus/prometheus/retrieval/format" |
|
|
|
|
"log" |
|
|
|
|
"sort" |
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/prometheus/prometheus/retrieval/format" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
intervalKey = "interval" |
|
|
|
|
|
|
|
|
|
targetAddQueueSize = 100 |
|
|
|
|
targetReplaceQueueSize = 100 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type TargetPool struct { |
|
|
|
|
sync.RWMutex |
|
|
|
|
|
|
|
|
|
done chan bool |
|
|
|
|
manager TargetManager |
|
|
|
|
targets []Target |
|
|
|
|
targets targets |
|
|
|
|
addTargetQueue chan Target |
|
|
|
|
replaceTargetsQueue chan []Target |
|
|
|
|
replaceTargetsQueue chan targets |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewTargetPool(m TargetManager) *TargetPool { |
|
|
|
|
return &TargetPool{ |
|
|
|
|
manager: m, |
|
|
|
|
addTargetQueue: make(chan Target), |
|
|
|
|
replaceTargetsQueue: make(chan []Target), |
|
|
|
|
addTargetQueue: make(chan Target, targetAddQueueSize), |
|
|
|
|
replaceTargetsQueue: make(chan targets, targetReplaceQueueSize), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p TargetPool) Len() int { |
|
|
|
|
return len(p.targets) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p TargetPool) Less(i, j int) bool { |
|
|
|
|
return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p TargetPool) Swap(i, j int) { |
|
|
|
|
p.targets[i], p.targets[j] = p.targets[j], p.targets[i] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *TargetPool) Run(results chan format.Result, interval time.Duration) { |
|
|
|
|
ticker := time.Tick(interval) |
|
|
|
|
ticker := time.NewTicker(interval) |
|
|
|
|
defer ticker.Stop() |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-ticker: |
|
|
|
|
case <-ticker.C: |
|
|
|
|
p.runIteration(results, interval) |
|
|
|
|
case newTarget := <-p.addTargetQueue: |
|
|
|
|
p.addTarget(newTarget) |
|
|
|
@ -65,7 +61,7 @@ func (p *TargetPool) Run(results chan format.Result, interval time.Duration) {
|
|
|
|
|
p.replaceTargets(newTargets) |
|
|
|
|
case <-p.done: |
|
|
|
|
log.Printf("TargetPool exiting...") |
|
|
|
|
break |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -79,6 +75,9 @@ func (p *TargetPool) AddTarget(target Target) {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *TargetPool) addTarget(target Target) { |
|
|
|
|
p.Lock() |
|
|
|
|
defer p.Unlock() |
|
|
|
|
|
|
|
|
|
p.targets = append(p.targets, target) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -87,6 +86,9 @@ func (p *TargetPool) ReplaceTargets(newTargets []Target) {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *TargetPool) replaceTargets(newTargets []Target) { |
|
|
|
|
p.Lock() |
|
|
|
|
defer p.Unlock() |
|
|
|
|
|
|
|
|
|
// Replace old target list by new one, but reuse those targets from the old
|
|
|
|
|
// list of targets which are also in the new list (to preserve scheduling and
|
|
|
|
|
// health state).
|
|
|
|
@ -98,6 +100,7 @@ func (p *TargetPool) replaceTargets(newTargets []Target) {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
p.targets = newTargets |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -109,14 +112,15 @@ func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *TargetPool) runIteration(results chan format.Result, interval time.Duration) { |
|
|
|
|
begin := time.Now() |
|
|
|
|
p.RLock() |
|
|
|
|
defer p.RUnlock() |
|
|
|
|
|
|
|
|
|
targetCount := p.Len() |
|
|
|
|
finished := make(chan bool, targetCount) |
|
|
|
|
begin := time.Now() |
|
|
|
|
wait := sync.WaitGroup{} |
|
|
|
|
|
|
|
|
|
// Sort p.targets by next scheduling time so we can process the earliest
|
|
|
|
|
// targets first.
|
|
|
|
|
sort.Sort(p) |
|
|
|
|
sort.Sort(p.targets) |
|
|
|
|
|
|
|
|
|
for _, target := range p.targets { |
|
|
|
|
now := time.Now() |
|
|
|
@ -124,34 +128,29 @@ func (p *TargetPool) runIteration(results chan format.Result, interval time.Dura
|
|
|
|
|
if target.scheduledFor().After(now) { |
|
|
|
|
// None of the remaining targets are ready to be scheduled. Signal that
|
|
|
|
|
// we're done processing them in this scrape iteration.
|
|
|
|
|
finished <- true |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
wait.Add(1) |
|
|
|
|
|
|
|
|
|
go func(t Target) { |
|
|
|
|
p.runSingle(now, results, t) |
|
|
|
|
finished <- true |
|
|
|
|
wait.Done() |
|
|
|
|
}(target) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for i := 0; i < targetCount; { |
|
|
|
|
select { |
|
|
|
|
case <-finished: |
|
|
|
|
i++ |
|
|
|
|
case newTarget := <-p.addTargetQueue: |
|
|
|
|
p.addTarget(newTarget) |
|
|
|
|
case newTargets := <-p.replaceTargetsQueue: |
|
|
|
|
p.replaceTargets(newTargets) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
close(finished) |
|
|
|
|
wait.Wait() |
|
|
|
|
|
|
|
|
|
duration := float64(time.Since(begin) / time.Millisecond) |
|
|
|
|
retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// BUG(all): Not really thread-safe. Only used in /status page for now.
|
|
|
|
|
func (p *TargetPool) Targets() []Target { |
|
|
|
|
return p.targets |
|
|
|
|
p.RLock() |
|
|
|
|
defer p.RUnlock() |
|
|
|
|
|
|
|
|
|
targets := make([]Target, len(p.targets)) |
|
|
|
|
copy(targets, p.targets) |
|
|
|
|
|
|
|
|
|
return targets |
|
|
|
|
} |
|
|
|
|