prometheus/scrape/manager.go

168 lines
4.3 KiB
Go
Raw Normal View History

2017-11-25 13:13:54 +00:00
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scrape
2017-11-25 13:13:54 +00:00
import (
"fmt"
2018-01-18 11:49:42 +00:00
"reflect"
2018-01-17 11:46:17 +00:00
"sync"
2017-11-25 13:13:54 +00:00
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
Refactor SD configuration to remove `config` dependency (#3629) * refactor: move targetGroup struct and CheckOverflow() to their own package * refactor: move auth and security related structs to a utility package, fix import error in utility package * refactor: Azure SD, remove SD struct from config * refactor: DNS SD, remove SD struct from config into dns package * refactor: ec2 SD, move SD struct from config into the ec2 package * refactor: file SD, move SD struct from config to file discovery package * refactor: gce, move SD struct from config to gce discovery package * refactor: move HTTPClientConfig and URL into util/config, fix import error in httputil * refactor: consul, move SD struct from config into consul discovery package * refactor: marathon, move SD struct from config into marathon discovery package * refactor: triton, move SD struct from config to triton discovery package, fix test * refactor: zookeeper, move SD structs from config to zookeeper discovery package * refactor: openstack, remove SD struct from config, move into openstack discovery package * refactor: kubernetes, move SD struct from config into kubernetes discovery package * refactor: notifier, use targetgroup package instead of config * refactor: tests for file, marathon, triton SD - use targetgroup package instead of config.TargetGroup * refactor: retrieval, use targetgroup package instead of config.TargetGroup * refactor: storage, use config util package * refactor: discovery manager, use targetgroup package instead of config.TargetGroup * refactor: use HTTPClient and TLS config from configUtil instead of config * refactor: tests, use targetgroup package instead of config.TargetGroup * refactor: fix tagetgroup.Group pointers that were removed by mistake * refactor: openstack, kubernetes: drop prefixes * refactor: remove import aliases forced due to vscode bug * refactor: move main SD struct out of config into discovery/config * refactor: rename configUtil to config_util * refactor: rename yamlUtil to yaml_config * refactor: kubernetes, remove prefixes * refactor: move the TargetGroup package to discovery/ * refactor: fix order of imports
2017-12-29 20:01:34 +00:00
"github.com/prometheus/prometheus/discovery/targetgroup"
2017-11-25 13:13:54 +00:00
"github.com/prometheus/prometheus/storage"
)
// Appendable returns an Appender.
type Appendable interface {
Appender() (storage.Appender, error)
}
// NewManager is the Manager constructor
func NewManager(logger log.Logger, app Appendable) *Manager {
2017-11-25 13:13:54 +00:00
return &Manager{
2017-11-25 13:13:54 +00:00
append: app,
logger: logger,
scrapeConfigs: make(map[string]*config.ScrapeConfig),
scrapePools: make(map[string]*scrapePool),
graceShut: make(chan struct{}),
2017-11-25 13:13:54 +00:00
}
}
// Manager maintains a set of scrape pools and manages start/stop cycles
2017-11-25 13:13:54 +00:00
// when receiving new target groups form the discovery manager.
type Manager struct {
2017-11-25 13:13:54 +00:00
logger log.Logger
append Appendable
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
2018-01-17 11:46:17 +00:00
mtx sync.RWMutex
graceShut chan struct{}
2017-11-25 13:13:54 +00:00
}
// Run starts background processing to handle target updates and reload the scraping loops.
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
2017-11-25 13:13:54 +00:00
for {
select {
case ts := <-tsets:
m.reload(ts)
case <-m.graceShut:
return nil
2017-11-25 13:13:54 +00:00
}
}
}
// Stop cancels all running scrape pools and blocks until all have exited.
func (m *Manager) Stop() {
for _, sp := range m.scrapePools {
sp.stop()
}
close(m.graceShut)
}
2017-11-25 13:13:54 +00:00
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
func (m *Manager) ApplyConfig(cfg *config.Config) error {
2018-01-17 11:46:17 +00:00
m.mtx.Lock()
defer m.mtx.Unlock()
c := make(map[string]*config.ScrapeConfig)
for _, scfg := range cfg.ScrapeConfigs {
c[scfg.JobName] = scfg
2017-11-25 13:13:54 +00:00
}
2018-01-17 11:46:17 +00:00
m.scrapeConfigs = c
2018-01-18 11:49:42 +00:00
// Cleanup and reload pool if config has changed.
for name, sp := range m.scrapePools {
if cfg, ok := m.scrapeConfigs[name]; !ok {
sp.stop()
delete(m.scrapePools, name)
} else if !reflect.DeepEqual(sp.config, cfg) {
sp.reload(cfg)
}
}
2017-11-25 13:13:54 +00:00
return nil
}
// TargetMap returns map of active and dropped targets and their corresponding scrape config job name.
func (m *Manager) TargetMap() map[string][]*Target {
2018-01-17 11:46:17 +00:00
m.mtx.Lock()
defer m.mtx.Unlock()
targets := make(map[string][]*Target)
for jobName, sp := range m.scrapePools {
sp.mtx.RLock()
for _, t := range sp.targets {
targets[jobName] = append(targets[jobName], t)
2017-11-25 13:13:54 +00:00
}
2018-01-17 11:46:17 +00:00
targets[jobName] = append(targets[jobName], sp.droppedTargets...)
sp.mtx.RUnlock()
2017-11-25 13:13:54 +00:00
}
2018-01-17 11:46:17 +00:00
return targets
2017-11-25 13:13:54 +00:00
}
// Targets returns the targets currently being scraped.
func (m *Manager) Targets() []*Target {
2018-01-17 11:46:17 +00:00
m.mtx.Lock()
defer m.mtx.Unlock()
var targets []*Target
for _, p := range m.scrapePools {
p.mtx.RLock()
for _, tt := range p.targets {
targets = append(targets, tt)
2017-11-25 13:13:54 +00:00
}
2018-01-17 11:46:17 +00:00
p.mtx.RUnlock()
2017-11-25 13:13:54 +00:00
}
2018-01-17 11:46:17 +00:00
return targets
2017-11-25 13:13:54 +00:00
}
// DroppedTargets returns the targets dropped during relabelling.
func (m *Manager) DroppedTargets() []*Target {
m.mtx.Lock()
defer m.mtx.Unlock()
var droppedTargets []*Target
for _, p := range m.scrapePools {
p.mtx.RLock()
droppedTargets = append(droppedTargets, p.droppedTargets...)
p.mtx.RUnlock()
}
return droppedTargets
}
func (m *Manager) reload(t map[string][]*targetgroup.Group) {
2017-11-25 13:13:54 +00:00
for tsetName, tgroup := range t {
scrapeConfig, ok := m.scrapeConfigs[tsetName]
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName))
continue
2017-11-25 13:13:54 +00:00
}
// Scrape pool doesn't exist so start a new one.
2017-11-25 13:13:54 +00:00
existing, ok := m.scrapePools[tsetName]
if !ok {
sp := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
2017-11-25 13:13:54 +00:00
m.scrapePools[tsetName] = sp
sp.Sync(tgroup)
} else {
existing.Sync(tgroup)
}
2018-01-14 19:42:31 +00:00
}
2017-11-25 13:13:54 +00:00
}