prometheus/scrape/manager.go

219 lines
5.5 KiB
Go

// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scrape
import (
"fmt"
"reflect"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/storage"
)
// Appendable returns an Appender.
type Appendable interface {
Appender() (storage.Appender, error)
}
// NewManager is the Manager constructor
func NewManager(logger log.Logger, app Appendable) *Manager {
if logger == nil {
logger = log.NewNopLogger()
}
return &Manager{
append: app,
logger: logger,
scrapeConfigs: make(map[string]*config.ScrapeConfig),
scrapePools: make(map[string]*scrapePool),
graceShut: make(chan struct{}),
triggerReload: make(chan struct{}, 1),
}
}
// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups form the discovery manager.
type Manager struct {
logger log.Logger
append Appendable
graceShut chan struct{}
mtxScrape sync.Mutex // Guards the fields below.
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
targetSets map[string][]*targetgroup.Group
triggerReload chan struct{}
}
// Run receives and saves target set updates and triggers the scraping loops reloading.
// Reloading happens in the background so that it doesn't block receiving targets updates.
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
go m.reloader()
for {
select {
case ts := <-tsets:
m.updateTsets(ts)
select {
case m.triggerReload <- struct{}{}:
default:
}
case <-m.graceShut:
return nil
}
}
}
func (m *Manager) reloader() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-m.graceShut:
return
case <-ticker.C:
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}
}
}
func (m *Manager) reload() {
m.mtxScrape.Lock()
var wg sync.WaitGroup
for setName, groups := range m.targetSets {
if _, ok := m.scrapePools[setName]; !ok {
scrapeConfig, ok := m.scrapeConfigs[setName]
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
continue
}
sp, err := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", setName))
if err != nil {
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
continue
}
m.scrapePools[setName] = sp
}
wg.Add(1)
// Run the sync in parallel as these take a while and at high load can't catch up.
go func(sp *scrapePool, groups []*targetgroup.Group) {
sp.Sync(groups)
wg.Done()
}(m.scrapePools[setName], groups)
}
m.mtxScrape.Unlock()
wg.Wait()
}
// Stop cancels all running scrape pools and blocks until all have exited.
func (m *Manager) Stop() {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
for _, sp := range m.scrapePools {
sp.stop()
}
close(m.graceShut)
}
func (m *Manager) updateTsets(tsets map[string][]*targetgroup.Group) {
m.mtxScrape.Lock()
m.targetSets = tsets
m.mtxScrape.Unlock()
}
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
func (m *Manager) ApplyConfig(cfg *config.Config) error {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
c := make(map[string]*config.ScrapeConfig)
for _, scfg := range cfg.ScrapeConfigs {
c[scfg.JobName] = scfg
}
m.scrapeConfigs = c
// Cleanup and reload pool if the configuration has changed.
var failed bool
for name, sp := range m.scrapePools {
if cfg, ok := m.scrapeConfigs[name]; !ok {
sp.stop()
delete(m.scrapePools, name)
} else if !reflect.DeepEqual(sp.config, cfg) {
err := sp.reload(cfg)
if err != nil {
level.Error(m.logger).Log("msg", "error reloading scrape pool", "err", err, "scrape_pool", name)
failed = true
}
}
}
if failed {
return fmt.Errorf("failed to apply the new configuration")
}
return nil
}
// TargetsAll returns active and dropped targets grouped by job_name.
func (m *Manager) TargetsAll() map[string][]*Target {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
targets := make(map[string][]*Target, len(m.scrapePools))
for tset, sp := range m.scrapePools {
targets[tset] = append(sp.ActiveTargets(), sp.DroppedTargets()...)
}
return targets
}
// TargetsActive returns the active targets currently being scraped.
func (m *Manager) TargetsActive() map[string][]*Target {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
targets := make(map[string][]*Target, len(m.scrapePools))
for tset, sp := range m.scrapePools {
targets[tset] = sp.ActiveTargets()
}
return targets
}
// TargetsDropped returns the dropped targets during relabelling.
func (m *Manager) TargetsDropped() map[string][]*Target {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
targets := make(map[string][]*Target, len(m.scrapePools))
for tset, sp := range m.scrapePools {
targets[tset] = sp.DroppedTargets()
}
return targets
}