mirror of https://github.com/prometheus/prometheus
machine424
3 months ago
6 changed files with 13 additions and 1846 deletions
@ -1,332 +0,0 @@
|
||||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package legacymanager |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"reflect" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
|
||||
"github.com/prometheus/prometheus/discovery" |
||||
"github.com/prometheus/prometheus/discovery/targetgroup" |
||||
) |
||||
|
||||
type poolKey struct { |
||||
setName string |
||||
provider string |
||||
} |
||||
|
||||
// provider holds a Discoverer instance, its configuration and its subscribers.
|
||||
type provider struct { |
||||
name string |
||||
d discovery.Discoverer |
||||
subs []string |
||||
config interface{} |
||||
} |
||||
|
||||
// NewManager is the Discovery Manager constructor.
|
||||
func NewManager(ctx context.Context, logger log.Logger, registerer prometheus.Registerer, sdMetrics map[string]discovery.DiscovererMetrics, options ...func(*Manager)) *Manager { |
||||
if logger == nil { |
||||
logger = log.NewNopLogger() |
||||
} |
||||
mgr := &Manager{ |
||||
logger: logger, |
||||
syncCh: make(chan map[string][]*targetgroup.Group), |
||||
targets: make(map[poolKey]map[string]*targetgroup.Group), |
||||
discoverCancel: []context.CancelFunc{}, |
||||
ctx: ctx, |
||||
updatert: 5 * time.Second, |
||||
triggerSend: make(chan struct{}, 1), |
||||
registerer: registerer, |
||||
sdMetrics: sdMetrics, |
||||
} |
||||
for _, option := range options { |
||||
option(mgr) |
||||
} |
||||
|
||||
// Register the metrics.
|
||||
// We have to do this after setting all options, so that the name of the Manager is set.
|
||||
if metrics, err := discovery.NewManagerMetrics(registerer, mgr.name); err == nil { |
||||
mgr.metrics = metrics |
||||
} else { |
||||
level.Error(logger).Log("msg", "Failed to create discovery manager metrics", "manager", mgr.name, "err", err) |
||||
return nil |
||||
} |
||||
|
||||
return mgr |
||||
} |
||||
|
||||
// Name sets the name of the manager.
|
||||
func Name(n string) func(*Manager) { |
||||
return func(m *Manager) { |
||||
m.mtx.Lock() |
||||
defer m.mtx.Unlock() |
||||
m.name = n |
||||
} |
||||
} |
||||
|
||||
// Manager maintains a set of discovery providers and sends each update to a map channel.
|
||||
// Targets are grouped by the target set name.
|
||||
type Manager struct { |
||||
logger log.Logger |
||||
name string |
||||
mtx sync.RWMutex |
||||
ctx context.Context |
||||
discoverCancel []context.CancelFunc |
||||
|
||||
// Some Discoverers(eg. k8s) send only the updates for a given target group
|
||||
// so we use map[tg.Source]*targetgroup.Group to know which group to update.
|
||||
targets map[poolKey]map[string]*targetgroup.Group |
||||
// providers keeps track of SD providers.
|
||||
providers []*provider |
||||
// The sync channel sends the updates as a map where the key is the job value from the scrape config.
|
||||
syncCh chan map[string][]*targetgroup.Group |
||||
|
||||
// How long to wait before sending updates to the channel. The variable
|
||||
// should only be modified in unit tests.
|
||||
updatert time.Duration |
||||
|
||||
// The triggerSend channel signals to the manager that new updates have been received from providers.
|
||||
triggerSend chan struct{} |
||||
|
||||
// A registerer for all service discovery metrics.
|
||||
registerer prometheus.Registerer |
||||
|
||||
metrics *discovery.Metrics |
||||
sdMetrics map[string]discovery.DiscovererMetrics |
||||
} |
||||
|
||||
// Run starts the background processing.
|
||||
func (m *Manager) Run() error { |
||||
go m.sender() |
||||
<-m.ctx.Done() |
||||
m.cancelDiscoverers() |
||||
return m.ctx.Err() |
||||
} |
||||
|
||||
// SyncCh returns a read only channel used by all the clients to receive target updates.
|
||||
func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group { |
||||
return m.syncCh |
||||
} |
||||
|
||||
// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
|
||||
func (m *Manager) ApplyConfig(cfg map[string]discovery.Configs) error { |
||||
m.mtx.Lock() |
||||
defer m.mtx.Unlock() |
||||
|
||||
for pk := range m.targets { |
||||
if _, ok := cfg[pk.setName]; !ok { |
||||
m.metrics.DiscoveredTargets.DeleteLabelValues(m.name, pk.setName) |
||||
} |
||||
} |
||||
m.cancelDiscoverers() |
||||
m.targets = make(map[poolKey]map[string]*targetgroup.Group) |
||||
m.providers = nil |
||||
m.discoverCancel = nil |
||||
|
||||
failedCount := 0 |
||||
for name, scfg := range cfg { |
||||
failedCount += m.registerProviders(scfg, name) |
||||
m.metrics.DiscoveredTargets.WithLabelValues(name).Set(0) |
||||
} |
||||
m.metrics.FailedConfigs.Set(float64(failedCount)) |
||||
|
||||
for _, prov := range m.providers { |
||||
m.startProvider(m.ctx, prov) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// StartCustomProvider is used for sdtool. Only use this if you know what you're doing.
|
||||
func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker discovery.Discoverer) { |
||||
p := &provider{ |
||||
name: name, |
||||
d: worker, |
||||
subs: []string{name}, |
||||
} |
||||
m.providers = append(m.providers, p) |
||||
m.startProvider(ctx, p) |
||||
} |
||||
|
||||
func (m *Manager) startProvider(ctx context.Context, p *provider) { |
||||
level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs)) |
||||
ctx, cancel := context.WithCancel(ctx) |
||||
updates := make(chan []*targetgroup.Group) |
||||
|
||||
m.discoverCancel = append(m.discoverCancel, cancel) |
||||
|
||||
go p.d.Run(ctx, updates) |
||||
go m.updater(ctx, p, updates) |
||||
} |
||||
|
||||
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) { |
||||
for { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
case tgs, ok := <-updates: |
||||
m.metrics.ReceivedUpdates.Inc() |
||||
if !ok { |
||||
level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name) |
||||
return |
||||
} |
||||
|
||||
for _, s := range p.subs { |
||||
m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) |
||||
} |
||||
|
||||
select { |
||||
case m.triggerSend <- struct{}{}: |
||||
default: |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (m *Manager) sender() { |
||||
ticker := time.NewTicker(m.updatert) |
||||
defer ticker.Stop() |
||||
|
||||
for { |
||||
select { |
||||
case <-m.ctx.Done(): |
||||
return |
||||
case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
|
||||
select { |
||||
case <-m.triggerSend: |
||||
m.metrics.SentUpdates.Inc() |
||||
select { |
||||
case m.syncCh <- m.allGroups(): |
||||
default: |
||||
m.metrics.DelayedUpdates.Inc() |
||||
level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle") |
||||
select { |
||||
case m.triggerSend <- struct{}{}: |
||||
default: |
||||
} |
||||
} |
||||
default: |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (m *Manager) cancelDiscoverers() { |
||||
for _, c := range m.discoverCancel { |
||||
c() |
||||
} |
||||
} |
||||
|
||||
func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { |
||||
m.mtx.Lock() |
||||
defer m.mtx.Unlock() |
||||
|
||||
if _, ok := m.targets[poolKey]; !ok { |
||||
m.targets[poolKey] = make(map[string]*targetgroup.Group) |
||||
} |
||||
for _, tg := range tgs { |
||||
if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
|
||||
m.targets[poolKey][tg.Source] = tg |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (m *Manager) allGroups() map[string][]*targetgroup.Group { |
||||
m.mtx.RLock() |
||||
defer m.mtx.RUnlock() |
||||
|
||||
tSets := map[string][]*targetgroup.Group{} |
||||
n := map[string]int{} |
||||
for pkey, tsets := range m.targets { |
||||
for _, tg := range tsets { |
||||
// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
|
||||
// to signal that it needs to stop all scrape loops for this target set.
|
||||
tSets[pkey.setName] = append(tSets[pkey.setName], tg) |
||||
n[pkey.setName] += len(tg.Targets) |
||||
} |
||||
} |
||||
for setName, v := range n { |
||||
m.metrics.DiscoveredTargets.WithLabelValues(setName).Set(float64(v)) |
||||
} |
||||
return tSets |
||||
} |
||||
|
||||
// registerProviders returns a number of failed SD config.
|
||||
func (m *Manager) registerProviders(cfgs discovery.Configs, setName string) int { |
||||
var ( |
||||
failed int |
||||
added bool |
||||
) |
||||
add := func(cfg discovery.Config) { |
||||
for _, p := range m.providers { |
||||
if reflect.DeepEqual(cfg, p.config) { |
||||
p.subs = append(p.subs, setName) |
||||
added = true |
||||
return |
||||
} |
||||
} |
||||
typ := cfg.Name() |
||||
d, err := cfg.NewDiscoverer(discovery.DiscovererOptions{ |
||||
Logger: log.With(m.logger, "discovery", typ, "config", setName), |
||||
Metrics: m.sdMetrics[typ], |
||||
}) |
||||
if err != nil { |
||||
level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", typ, "config", setName) |
||||
failed++ |
||||
return |
||||
} |
||||
m.providers = append(m.providers, &provider{ |
||||
name: fmt.Sprintf("%s/%d", typ, len(m.providers)), |
||||
d: d, |
||||
config: cfg, |
||||
subs: []string{setName}, |
||||
}) |
||||
added = true |
||||
} |
||||
for _, cfg := range cfgs { |
||||
add(cfg) |
||||
} |
||||
if !added { |
||||
// Add an empty target group to force the refresh of the corresponding
|
||||
// scrape pool and to notify the receiver that this target set has no
|
||||
// current targets.
|
||||
// It can happen because the combined set of SD configurations is empty
|
||||
// or because we fail to instantiate all the SD configurations.
|
||||
add(discovery.StaticConfig{{}}) |
||||
} |
||||
return failed |
||||
} |
||||
|
||||
// StaticProvider holds a list of target groups that never change.
|
||||
type StaticProvider struct { |
||||
TargetGroups []*targetgroup.Group |
||||
} |
||||
|
||||
// Run implements the Worker interface.
|
||||
func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { |
||||
// We still have to consider that the consumer exits right away in which case
|
||||
// the context will be canceled.
|
||||
select { |
||||
case ch <- sd.TargetGroups: |
||||
case <-ctx.Done(): |
||||
} |
||||
close(ch) |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -1,261 +0,0 @@
|
||||
// Copyright 2020 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package legacymanager |
||||
|
||||
import ( |
||||
"errors" |
||||
"fmt" |
||||
"reflect" |
||||
"sort" |
||||
"strconv" |
||||
"strings" |
||||
"sync" |
||||
|
||||
"gopkg.in/yaml.v2" |
||||
|
||||
"github.com/prometheus/prometheus/discovery" |
||||
"github.com/prometheus/prometheus/discovery/targetgroup" |
||||
) |
||||
|
||||
const ( |
||||
configFieldPrefix = "AUTO_DISCOVERY_" |
||||
staticConfigsKey = "static_configs" |
||||
staticConfigsFieldName = configFieldPrefix + staticConfigsKey |
||||
) |
||||
|
||||
var ( |
||||
configNames = make(map[string]discovery.Config) |
||||
configFieldNames = make(map[reflect.Type]string) |
||||
configFields []reflect.StructField |
||||
|
||||
configTypesMu sync.Mutex |
||||
configTypes = make(map[reflect.Type]reflect.Type) |
||||
|
||||
emptyStructType = reflect.TypeOf(struct{}{}) |
||||
configsType = reflect.TypeOf(discovery.Configs{}) |
||||
) |
||||
|
||||
// RegisterConfig registers the given Config type for YAML marshaling and unmarshaling.
|
||||
func RegisterConfig(config discovery.Config) { |
||||
registerConfig(config.Name()+"_sd_configs", reflect.TypeOf(config), config) |
||||
} |
||||
|
||||
func init() { |
||||
// N.B.: static_configs is the only Config type implemented by default.
|
||||
// All other types are registered at init by their implementing packages.
|
||||
elemTyp := reflect.TypeOf(&targetgroup.Group{}) |
||||
registerConfig(staticConfigsKey, elemTyp, discovery.StaticConfig{}) |
||||
} |
||||
|
||||
func registerConfig(yamlKey string, elemType reflect.Type, config discovery.Config) { |
||||
name := config.Name() |
||||
if _, ok := configNames[name]; ok { |
||||
panic(fmt.Sprintf("discovery: Config named %q is already registered", name)) |
||||
} |
||||
configNames[name] = config |
||||
|
||||
fieldName := configFieldPrefix + yamlKey // Field must be exported.
|
||||
configFieldNames[elemType] = fieldName |
||||
|
||||
// Insert fields in sorted order.
|
||||
i := sort.Search(len(configFields), func(k int) bool { |
||||
return fieldName < configFields[k].Name |
||||
}) |
||||
configFields = append(configFields, reflect.StructField{}) // Add empty field at end.
|
||||
copy(configFields[i+1:], configFields[i:]) // Shift fields to the right.
|
||||
configFields[i] = reflect.StructField{ // Write new field in place.
|
||||
Name: fieldName, |
||||
Type: reflect.SliceOf(elemType), |
||||
Tag: reflect.StructTag(`yaml:"` + yamlKey + `,omitempty"`), |
||||
} |
||||
} |
||||
|
||||
func getConfigType(out reflect.Type) reflect.Type { |
||||
configTypesMu.Lock() |
||||
defer configTypesMu.Unlock() |
||||
if typ, ok := configTypes[out]; ok { |
||||
return typ |
||||
} |
||||
// Initial exported fields map one-to-one.
|
||||
var fields []reflect.StructField |
||||
for i, n := 0, out.NumField(); i < n; i++ { |
||||
switch field := out.Field(i); { |
||||
case field.PkgPath == "" && field.Type != configsType: |
||||
fields = append(fields, field) |
||||
default: |
||||
fields = append(fields, reflect.StructField{ |
||||
Name: "_" + field.Name, // Field must be unexported.
|
||||
PkgPath: out.PkgPath(), |
||||
Type: emptyStructType, |
||||
}) |
||||
} |
||||
} |
||||
// Append extra config fields on the end.
|
||||
fields = append(fields, configFields...) |
||||
typ := reflect.StructOf(fields) |
||||
configTypes[out] = typ |
||||
return typ |
||||
} |
||||
|
||||
// UnmarshalYAMLWithInlineConfigs helps implement yaml.Unmarshal for structs
|
||||
// that have a Configs field that should be inlined.
|
||||
func UnmarshalYAMLWithInlineConfigs(out interface{}, unmarshal func(interface{}) error) error { |
||||
outVal := reflect.ValueOf(out) |
||||
if outVal.Kind() != reflect.Ptr { |
||||
return fmt.Errorf("discovery: can only unmarshal into a struct pointer: %T", out) |
||||
} |
||||
outVal = outVal.Elem() |
||||
if outVal.Kind() != reflect.Struct { |
||||
return fmt.Errorf("discovery: can only unmarshal into a struct pointer: %T", out) |
||||
} |
||||
outTyp := outVal.Type() |
||||
|
||||
cfgTyp := getConfigType(outTyp) |
||||
cfgPtr := reflect.New(cfgTyp) |
||||
cfgVal := cfgPtr.Elem() |
||||
|
||||
// Copy shared fields (defaults) to dynamic value.
|
||||
var configs *discovery.Configs |
||||
for i, n := 0, outVal.NumField(); i < n; i++ { |
||||
if outTyp.Field(i).Type == configsType { |
||||
configs = outVal.Field(i).Addr().Interface().(*discovery.Configs) |
||||
continue |
||||
} |
||||
if cfgTyp.Field(i).PkgPath != "" { |
||||
continue // Field is unexported: ignore.
|
||||
} |
||||
cfgVal.Field(i).Set(outVal.Field(i)) |
||||
} |
||||
if configs == nil { |
||||
return fmt.Errorf("discovery: Configs field not found in type: %T", out) |
||||
} |
||||
|
||||
// Unmarshal into dynamic value.
|
||||
if err := unmarshal(cfgPtr.Interface()); err != nil { |
||||
return replaceYAMLTypeError(err, cfgTyp, outTyp) |
||||
} |
||||
|
||||
// Copy shared fields from dynamic value.
|
||||
for i, n := 0, outVal.NumField(); i < n; i++ { |
||||
if cfgTyp.Field(i).PkgPath != "" { |
||||
continue // Field is unexported: ignore.
|
||||
} |
||||
outVal.Field(i).Set(cfgVal.Field(i)) |
||||
} |
||||
|
||||
var err error |
||||
*configs, err = readConfigs(cfgVal, outVal.NumField()) |
||||
return err |
||||
} |
||||
|
||||
func readConfigs(structVal reflect.Value, startField int) (discovery.Configs, error) { |
||||
var ( |
||||
configs discovery.Configs |
||||
targets []*targetgroup.Group |
||||
) |
||||
for i, n := startField, structVal.NumField(); i < n; i++ { |
||||
field := structVal.Field(i) |
||||
if field.Kind() != reflect.Slice { |
||||
panic("discovery: internal error: field is not a slice") |
||||
} |
||||
for k := 0; k < field.Len(); k++ { |
||||
val := field.Index(k) |
||||
if val.IsZero() || (val.Kind() == reflect.Ptr && val.Elem().IsZero()) { |
||||
key := configFieldNames[field.Type().Elem()] |
||||
key = strings.TrimPrefix(key, configFieldPrefix) |
||||
return nil, fmt.Errorf("empty or null section in %s", key) |
||||
} |
||||
switch c := val.Interface().(type) { |
||||
case *targetgroup.Group: |
||||
// Add index to the static config target groups for unique identification
|
||||
// within scrape pool.
|
||||
c.Source = strconv.Itoa(len(targets)) |
||||
// Coalesce multiple static configs into a single static config.
|
||||
targets = append(targets, c) |
||||
case discovery.Config: |
||||
configs = append(configs, c) |
||||
default: |
||||
panic("discovery: internal error: slice element is not a Config") |
||||
} |
||||
} |
||||
} |
||||
if len(targets) > 0 { |
||||
configs = append(configs, discovery.StaticConfig(targets)) |
||||
} |
||||
return configs, nil |
||||
} |
||||
|
||||
// MarshalYAMLWithInlineConfigs helps implement yaml.Marshal for structs
|
||||
// that have a Configs field that should be inlined.
|
||||
func MarshalYAMLWithInlineConfigs(in interface{}) (interface{}, error) { |
||||
inVal := reflect.ValueOf(in) |
||||
for inVal.Kind() == reflect.Ptr { |
||||
inVal = inVal.Elem() |
||||
} |
||||
inTyp := inVal.Type() |
||||
|
||||
cfgTyp := getConfigType(inTyp) |
||||
cfgPtr := reflect.New(cfgTyp) |
||||
cfgVal := cfgPtr.Elem() |
||||
|
||||
// Copy shared fields to dynamic value.
|
||||
var configs *discovery.Configs |
||||
for i, n := 0, inTyp.NumField(); i < n; i++ { |
||||
if inTyp.Field(i).Type == configsType { |
||||
configs = inVal.Field(i).Addr().Interface().(*discovery.Configs) |
||||
} |
||||
if cfgTyp.Field(i).PkgPath != "" { |
||||
continue // Field is unexported: ignore.
|
||||
} |
||||
cfgVal.Field(i).Set(inVal.Field(i)) |
||||
} |
||||
if configs == nil { |
||||
return nil, fmt.Errorf("discovery: Configs field not found in type: %T", in) |
||||
} |
||||
|
||||
if err := writeConfigs(cfgVal, *configs); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return cfgPtr.Interface(), nil |
||||
} |
||||
|
||||
func writeConfigs(structVal reflect.Value, configs discovery.Configs) error { |
||||
targets := structVal.FieldByName(staticConfigsFieldName).Addr().Interface().(*[]*targetgroup.Group) |
||||
for _, c := range configs { |
||||
if sc, ok := c.(discovery.StaticConfig); ok { |
||||
*targets = append(*targets, sc...) |
||||
continue |
||||
} |
||||
fieldName, ok := configFieldNames[reflect.TypeOf(c)] |
||||
if !ok { |
||||
return fmt.Errorf("discovery: cannot marshal unregistered Config type: %T", c) |
||||
} |
||||
field := structVal.FieldByName(fieldName) |
||||
field.Set(reflect.Append(field, reflect.ValueOf(c))) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func replaceYAMLTypeError(err error, oldTyp, newTyp reflect.Type) error { |
||||
var e *yaml.TypeError |
||||
if errors.As(err, &e) { |
||||
oldStr := oldTyp.String() |
||||
newStr := newTyp.String() |
||||
for i, s := range e.Errors { |
||||
e.Errors[i] = strings.ReplaceAll(s, oldStr, newStr) |
||||
} |
||||
} |
||||
return err |
||||
} |
Loading…
Reference in new issue