From d18fa62ae9041c8a6bd327eb2068dcc6d4de3d3f Mon Sep 17 00:00:00 2001 From: machine424 Date: Fri, 30 Aug 2024 13:46:16 +0200 Subject: [PATCH] chore(discovery): enable new-service-discovery-manager by default and drop legacymanager package Signed-off-by: machine424 --- cmd/prometheus/main.go | 65 +- discovery/legacymanager/manager.go | 332 ------- discovery/legacymanager/manager_test.go | 1185 ----------------------- discovery/legacymanager/registry.go | 261 ----- docs/command-line/prometheus.md | 2 +- docs/feature_flags.md | 14 - 6 files changed, 13 insertions(+), 1846 deletions(-) delete mode 100644 discovery/legacymanager/manager.go delete mode 100644 discovery/legacymanager/manager_test.go delete mode 100644 discovery/legacymanager/registry.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index c0bd136fa..fa953874a 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -58,8 +58,6 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/discovery/legacymanager" - "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -159,7 +157,6 @@ type flagConfig struct { // These options are extracted from featureList // for ease of use. enableExpandExternalLabels bool - enableNewSDManager bool enablePerStepStats bool enableAutoGOMAXPROCS bool enableAutoGOMEMLIMIT bool @@ -197,9 +194,6 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "metadata-wal-records": c.scrape.AppendMetadata = true level.Info(logger).Log("msg", "Experimental metadata records in WAL enabled, required for remote write 2.0") - case "new-service-discovery-manager": - c.enableNewSDManager = true - level.Info(logger).Log("msg", "Experimental service discovery manager") case "promql-per-step-stats": c.enablePerStepStats = true level.Info(logger).Log("msg", "Experimental per-step statistics reporting") @@ -463,7 +457,7 @@ func main() { a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates."). Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). Default("").StringsVar(&cfg.featureList) a.Flag("agent", "Run Prometheus in 'Agent mode'.").BoolVar(&agentMode) @@ -651,8 +645,8 @@ func main() { ctxScrape, cancelScrape = context.WithCancel(context.Background()) ctxNotify, cancelNotify = context.WithCancel(context.Background()) - discoveryManagerScrape discoveryManager - discoveryManagerNotify discoveryManager + discoveryManagerScrape *discovery.Manager + discoveryManagerNotify *discovery.Manager ) // Kubernetes client metrics are used by Kubernetes SD. @@ -672,42 +666,16 @@ func main() { os.Exit(1) } - if cfg.enableNewSDManager { - { - discMgr := discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape")) - if discMgr == nil { - level.Error(logger).Log("msg", "failed to create a discovery manager scrape") - os.Exit(1) - } - discoveryManagerScrape = discMgr - } - - { - discMgr := discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("notify")) - if discMgr == nil { - level.Error(logger).Log("msg", "failed to create a discovery manager notify") - os.Exit(1) - } - discoveryManagerNotify = discMgr - } - } else { - { - discMgr := legacymanager.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, legacymanager.Name("scrape")) - if discMgr == nil { - level.Error(logger).Log("msg", "failed to create a discovery manager scrape") - os.Exit(1) - } - discoveryManagerScrape = discMgr - } + discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape")) + if discoveryManagerScrape == nil { + level.Error(logger).Log("msg", "failed to create a discovery manager scrape") + os.Exit(1) + } - { - discMgr := legacymanager.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), prometheus.DefaultRegisterer, sdMetrics, legacymanager.Name("notify")) - if discMgr == nil { - level.Error(logger).Log("msg", "failed to create a discovery manager notify") - os.Exit(1) - } - discoveryManagerNotify = discMgr - } + discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("notify")) + if discoveryManagerNotify == nil { + level.Error(logger).Log("msg", "failed to create a discovery manager notify") + os.Exit(1) } scrapeManager, err := scrape.NewManager( @@ -1765,15 +1733,6 @@ func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Option } } -// discoveryManager interfaces the discovery manager. This is used to keep using -// the manager that restarts SD's on reload for a few releases until we feel -// the new manager can be enabled for all users. -type discoveryManager interface { - ApplyConfig(cfg map[string]discovery.Configs) error - Run() error - SyncCh() <-chan map[string][]*targetgroup.Group -} - // rwProtoMsgFlagParser is a custom parser for config.RemoteWriteProtoMsg enum. type rwProtoMsgFlagParser struct { msgs *[]config.RemoteWriteProtoMsg diff --git a/discovery/legacymanager/manager.go b/discovery/legacymanager/manager.go deleted file mode 100644 index 6fc61485d..000000000 --- a/discovery/legacymanager/manager.go +++ /dev/null @@ -1,332 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package legacymanager - -import ( - "context" - "fmt" - "reflect" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/prometheus/client_golang/prometheus" - - "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/discovery/targetgroup" -) - -type poolKey struct { - setName string - provider string -} - -// provider holds a Discoverer instance, its configuration and its subscribers. -type provider struct { - name string - d discovery.Discoverer - subs []string - config interface{} -} - -// NewManager is the Discovery Manager constructor. -func NewManager(ctx context.Context, logger log.Logger, registerer prometheus.Registerer, sdMetrics map[string]discovery.DiscovererMetrics, options ...func(*Manager)) *Manager { - if logger == nil { - logger = log.NewNopLogger() - } - mgr := &Manager{ - logger: logger, - syncCh: make(chan map[string][]*targetgroup.Group), - targets: make(map[poolKey]map[string]*targetgroup.Group), - discoverCancel: []context.CancelFunc{}, - ctx: ctx, - updatert: 5 * time.Second, - triggerSend: make(chan struct{}, 1), - registerer: registerer, - sdMetrics: sdMetrics, - } - for _, option := range options { - option(mgr) - } - - // Register the metrics. - // We have to do this after setting all options, so that the name of the Manager is set. - if metrics, err := discovery.NewManagerMetrics(registerer, mgr.name); err == nil { - mgr.metrics = metrics - } else { - level.Error(logger).Log("msg", "Failed to create discovery manager metrics", "manager", mgr.name, "err", err) - return nil - } - - return mgr -} - -// Name sets the name of the manager. -func Name(n string) func(*Manager) { - return func(m *Manager) { - m.mtx.Lock() - defer m.mtx.Unlock() - m.name = n - } -} - -// Manager maintains a set of discovery providers and sends each update to a map channel. -// Targets are grouped by the target set name. -type Manager struct { - logger log.Logger - name string - mtx sync.RWMutex - ctx context.Context - discoverCancel []context.CancelFunc - - // Some Discoverers(eg. k8s) send only the updates for a given target group - // so we use map[tg.Source]*targetgroup.Group to know which group to update. - targets map[poolKey]map[string]*targetgroup.Group - // providers keeps track of SD providers. - providers []*provider - // The sync channel sends the updates as a map where the key is the job value from the scrape config. - syncCh chan map[string][]*targetgroup.Group - - // How long to wait before sending updates to the channel. The variable - // should only be modified in unit tests. - updatert time.Duration - - // The triggerSend channel signals to the manager that new updates have been received from providers. - triggerSend chan struct{} - - // A registerer for all service discovery metrics. - registerer prometheus.Registerer - - metrics *discovery.Metrics - sdMetrics map[string]discovery.DiscovererMetrics -} - -// Run starts the background processing. -func (m *Manager) Run() error { - go m.sender() - <-m.ctx.Done() - m.cancelDiscoverers() - return m.ctx.Err() -} - -// SyncCh returns a read only channel used by all the clients to receive target updates. -func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group { - return m.syncCh -} - -// ApplyConfig removes all running discovery providers and starts new ones using the provided config. -func (m *Manager) ApplyConfig(cfg map[string]discovery.Configs) error { - m.mtx.Lock() - defer m.mtx.Unlock() - - for pk := range m.targets { - if _, ok := cfg[pk.setName]; !ok { - m.metrics.DiscoveredTargets.DeleteLabelValues(m.name, pk.setName) - } - } - m.cancelDiscoverers() - m.targets = make(map[poolKey]map[string]*targetgroup.Group) - m.providers = nil - m.discoverCancel = nil - - failedCount := 0 - for name, scfg := range cfg { - failedCount += m.registerProviders(scfg, name) - m.metrics.DiscoveredTargets.WithLabelValues(name).Set(0) - } - m.metrics.FailedConfigs.Set(float64(failedCount)) - - for _, prov := range m.providers { - m.startProvider(m.ctx, prov) - } - - return nil -} - -// StartCustomProvider is used for sdtool. Only use this if you know what you're doing. -func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker discovery.Discoverer) { - p := &provider{ - name: name, - d: worker, - subs: []string{name}, - } - m.providers = append(m.providers, p) - m.startProvider(ctx, p) -} - -func (m *Manager) startProvider(ctx context.Context, p *provider) { - level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs)) - ctx, cancel := context.WithCancel(ctx) - updates := make(chan []*targetgroup.Group) - - m.discoverCancel = append(m.discoverCancel, cancel) - - go p.d.Run(ctx, updates) - go m.updater(ctx, p, updates) -} - -func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) { - for { - select { - case <-ctx.Done(): - return - case tgs, ok := <-updates: - m.metrics.ReceivedUpdates.Inc() - if !ok { - level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name) - return - } - - for _, s := range p.subs { - m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) - } - - select { - case m.triggerSend <- struct{}{}: - default: - } - } - } -} - -func (m *Manager) sender() { - ticker := time.NewTicker(m.updatert) - defer ticker.Stop() - - for { - select { - case <-m.ctx.Done(): - return - case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker. - select { - case <-m.triggerSend: - m.metrics.SentUpdates.Inc() - select { - case m.syncCh <- m.allGroups(): - default: - m.metrics.DelayedUpdates.Inc() - level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle") - select { - case m.triggerSend <- struct{}{}: - default: - } - } - default: - } - } - } -} - -func (m *Manager) cancelDiscoverers() { - for _, c := range m.discoverCancel { - c() - } -} - -func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { - m.mtx.Lock() - defer m.mtx.Unlock() - - if _, ok := m.targets[poolKey]; !ok { - m.targets[poolKey] = make(map[string]*targetgroup.Group) - } - for _, tg := range tgs { - if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. - m.targets[poolKey][tg.Source] = tg - } - } -} - -func (m *Manager) allGroups() map[string][]*targetgroup.Group { - m.mtx.RLock() - defer m.mtx.RUnlock() - - tSets := map[string][]*targetgroup.Group{} - n := map[string]int{} - for pkey, tsets := range m.targets { - for _, tg := range tsets { - // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' - // to signal that it needs to stop all scrape loops for this target set. - tSets[pkey.setName] = append(tSets[pkey.setName], tg) - n[pkey.setName] += len(tg.Targets) - } - } - for setName, v := range n { - m.metrics.DiscoveredTargets.WithLabelValues(setName).Set(float64(v)) - } - return tSets -} - -// registerProviders returns a number of failed SD config. -func (m *Manager) registerProviders(cfgs discovery.Configs, setName string) int { - var ( - failed int - added bool - ) - add := func(cfg discovery.Config) { - for _, p := range m.providers { - if reflect.DeepEqual(cfg, p.config) { - p.subs = append(p.subs, setName) - added = true - return - } - } - typ := cfg.Name() - d, err := cfg.NewDiscoverer(discovery.DiscovererOptions{ - Logger: log.With(m.logger, "discovery", typ, "config", setName), - Metrics: m.sdMetrics[typ], - }) - if err != nil { - level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", typ, "config", setName) - failed++ - return - } - m.providers = append(m.providers, &provider{ - name: fmt.Sprintf("%s/%d", typ, len(m.providers)), - d: d, - config: cfg, - subs: []string{setName}, - }) - added = true - } - for _, cfg := range cfgs { - add(cfg) - } - if !added { - // Add an empty target group to force the refresh of the corresponding - // scrape pool and to notify the receiver that this target set has no - // current targets. - // It can happen because the combined set of SD configurations is empty - // or because we fail to instantiate all the SD configurations. - add(discovery.StaticConfig{{}}) - } - return failed -} - -// StaticProvider holds a list of target groups that never change. -type StaticProvider struct { - TargetGroups []*targetgroup.Group -} - -// Run implements the Worker interface. -func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { - // We still have to consider that the consumer exits right away in which case - // the context will be canceled. - select { - case ch <- sd.TargetGroups: - case <-ctx.Done(): - } - close(ch) -} diff --git a/discovery/legacymanager/manager_test.go b/discovery/legacymanager/manager_test.go deleted file mode 100644 index f1be96311..000000000 --- a/discovery/legacymanager/manager_test.go +++ /dev/null @@ -1,1185 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package legacymanager - -import ( - "context" - "fmt" - "sort" - "strconv" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" - client_testutil "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/require" - - "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/discovery/targetgroup" - "github.com/prometheus/prometheus/util/testutil" -) - -func TestMain(m *testing.M) { - testutil.TolerantVerifyLeak(m) -} - -func newTestMetrics(t *testing.T, reg prometheus.Registerer) (*discovery.RefreshMetricsManager, map[string]discovery.DiscovererMetrics) { - refreshMetrics := discovery.NewRefreshMetrics(reg) - sdMetrics, err := discovery.RegisterSDMetrics(reg, refreshMetrics) - require.NoError(t, err) - return &refreshMetrics, sdMetrics -} - -// TestTargetUpdatesOrder checks that the target updates are received in the expected order. -func TestTargetUpdatesOrder(t *testing.T) { - // The order by which the updates are send is determined by the interval passed to the mock discovery adapter - // Final targets array is ordered alphabetically by the name of the discoverer. - // For example discoverer "A" with targets "t2,t3" and discoverer "B" with targets "t1,t2" will result in "t2,t3,t1,t2" after the merge. - testCases := []struct { - title string - updates map[string][]update - expectedTargets [][]*targetgroup.Group - }{ - { - title: "Single TP no updates", - updates: map[string][]update{ - "tp1": {}, - }, - expectedTargets: nil, - }, - { - title: "Multiple TPs no updates", - updates: map[string][]update{ - "tp1": {}, - "tp2": {}, - "tp3": {}, - }, - expectedTargets: nil, - }, - { - title: "Single TP empty initials", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []targetgroup.Group{}, - interval: 5 * time.Millisecond, - }, - }, - }, - expectedTargets: [][]*targetgroup.Group{ - {}, - }, - }, - { - title: "Multiple TPs empty initials", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []targetgroup.Group{}, - interval: 5 * time.Millisecond, - }, - }, - "tp2": { - { - targetGroups: []targetgroup.Group{}, - interval: 200 * time.Millisecond, - }, - }, - "tp3": { - { - targetGroups: []targetgroup.Group{}, - interval: 100 * time.Millisecond, - }, - }, - }, - expectedTargets: [][]*targetgroup.Group{ - {}, - {}, - {}, - }, - }, - { - title: "Single TP initials only", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - }, - }, - }, - expectedTargets: [][]*targetgroup.Group{ - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - }, - }, - { - title: "Multiple TPs initials only", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - }, - }, - "tp2": { - { - targetGroups: []targetgroup.Group{ - { - Source: "tp2_group1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - }, - interval: 10 * time.Millisecond, - }, - }, - }, - expectedTargets: [][]*targetgroup.Group{ - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - { - Source: "tp2_group1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - }, - }, - }, - { - title: "Single TP initials followed by empty updates", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - interval: 0, - }, - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{}, - }, - }, - interval: 10 * time.Millisecond, - }, - }, - }, - expectedTargets: [][]*targetgroup.Group{ - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{}, - }, - }, - }, - }, - { - title: "Single TP initials and new groups", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - interval: 0, - }, - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "4"}}, - }, - { - Source: "tp1_group3", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - }, - interval: 10 * time.Millisecond, - }, - }, - }, - expectedTargets: [][]*targetgroup.Group{ - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "4"}}, - }, - { - Source: "tp1_group3", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - }, - }, - }, - { - title: "Multiple TPs initials and new groups", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - interval: 10 * time.Millisecond, - }, - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group3", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - { - Source: "tp1_group4", - Targets: []model.LabelSet{{"__instance__": "4"}}, - }, - }, - interval: 500 * time.Millisecond, - }, - }, - "tp2": { - { - targetGroups: []targetgroup.Group{ - { - Source: "tp2_group1", - Targets: []model.LabelSet{{"__instance__": "5"}}, - }, - { - Source: "tp2_group2", - Targets: []model.LabelSet{{"__instance__": "6"}}, - }, - }, - interval: 100 * time.Millisecond, - }, - { - targetGroups: []targetgroup.Group{ - { - Source: "tp2_group3", - Targets: []model.LabelSet{{"__instance__": "7"}}, - }, - { - Source: "tp2_group4", - Targets: []model.LabelSet{{"__instance__": "8"}}, - }, - }, - interval: 10 * time.Millisecond, - }, - }, - }, - expectedTargets: [][]*targetgroup.Group{ - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - { - Source: "tp2_group1", - Targets: []model.LabelSet{{"__instance__": "5"}}, - }, - { - Source: "tp2_group2", - Targets: []model.LabelSet{{"__instance__": "6"}}, - }, - }, - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - { - Source: "tp2_group1", - Targets: []model.LabelSet{{"__instance__": "5"}}, - }, - { - Source: "tp2_group2", - Targets: []model.LabelSet{{"__instance__": "6"}}, - }, - { - Source: "tp2_group3", - Targets: []model.LabelSet{{"__instance__": "7"}}, - }, - { - Source: "tp2_group4", - Targets: []model.LabelSet{{"__instance__": "8"}}, - }, - }, - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - { - Source: "tp1_group3", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - { - Source: "tp1_group4", - Targets: []model.LabelSet{{"__instance__": "4"}}, - }, - { - Source: "tp2_group1", - Targets: []model.LabelSet{{"__instance__": "5"}}, - }, - { - Source: "tp2_group2", - Targets: []model.LabelSet{{"__instance__": "6"}}, - }, - { - Source: "tp2_group3", - Targets: []model.LabelSet{{"__instance__": "7"}}, - }, - { - Source: "tp2_group4", - Targets: []model.LabelSet{{"__instance__": "8"}}, - }, - }, - }, - }, - { - title: "One TP initials arrive after other TP updates.", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - interval: 10 * time.Millisecond, - }, - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "4"}}, - }, - }, - interval: 150 * time.Millisecond, - }, - }, - "tp2": { - { - targetGroups: []targetgroup.Group{ - { - Source: "tp2_group1", - Targets: []model.LabelSet{{"__instance__": "5"}}, - }, - { - Source: "tp2_group2", - Targets: []model.LabelSet{{"__instance__": "6"}}, - }, - }, - interval: 200 * time.Millisecond, - }, - { - targetGroups: []targetgroup.Group{ - { - Source: "tp2_group1", - Targets: []model.LabelSet{{"__instance__": "7"}}, - }, - { - Source: "tp2_group2", - Targets: []model.LabelSet{{"__instance__": "8"}}, - }, - }, - interval: 100 * time.Millisecond, - }, - }, - }, - expectedTargets: [][]*targetgroup.Group{ - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "4"}}, - }, - }, - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "4"}}, - }, - { - Source: "tp2_group1", - Targets: []model.LabelSet{{"__instance__": "5"}}, - }, - { - Source: "tp2_group2", - Targets: []model.LabelSet{{"__instance__": "6"}}, - }, - }, - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "4"}}, - }, - { - Source: "tp2_group1", - Targets: []model.LabelSet{{"__instance__": "7"}}, - }, - { - Source: "tp2_group2", - Targets: []model.LabelSet{{"__instance__": "8"}}, - }, - }, - }, - }, - - { - title: "Single TP empty update in between", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - interval: 30 * time.Millisecond, - }, - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{}, - }, - }, - interval: 10 * time.Millisecond, - }, - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "4"}}, - }, - }, - interval: 300 * time.Millisecond, - }, - }, - }, - expectedTargets: [][]*targetgroup.Group{ - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{}, - }, - }, - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "4"}}, - }, - }, - }, - }, - } - - for i, tc := range testCases { - tc := tc - t.Run(tc.title, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - reg := prometheus.NewRegistry() - _, sdMetrics := newTestMetrics(t, reg) - - discoveryManager := NewManager(ctx, log.NewNopLogger(), reg, sdMetrics) - require.NotNil(t, discoveryManager) - discoveryManager.updatert = 100 * time.Millisecond - - var totalUpdatesCount int - for _, up := range tc.updates { - if len(up) > 0 { - totalUpdatesCount += len(up) - } - } - provUpdates := make(chan []*targetgroup.Group, totalUpdatesCount) - - for _, up := range tc.updates { - go newMockDiscoveryProvider(up...).Run(ctx, provUpdates) - } - - for x := 0; x < totalUpdatesCount; x++ { - select { - case <-ctx.Done(): - t.Fatalf("%d: no update arrived within the timeout limit", x) - case tgs := <-provUpdates: - discoveryManager.updateGroup(poolKey{setName: strconv.Itoa(i), provider: tc.title}, tgs) - for _, got := range discoveryManager.allGroups() { - assertEqualGroups(t, got, tc.expectedTargets[x]) - } - } - } - }) - } -} - -func assertEqualGroups(t *testing.T, got, expected []*targetgroup.Group) { - t.Helper() - - // Need to sort by the groups's source as the received order is not guaranteed. - sort.Sort(byGroupSource(got)) - sort.Sort(byGroupSource(expected)) - - require.Equal(t, expected, got) -} - -func staticConfig(addrs ...string) discovery.StaticConfig { - var cfg discovery.StaticConfig - for i, addr := range addrs { - cfg = append(cfg, &targetgroup.Group{ - Source: strconv.Itoa(i), - Targets: []model.LabelSet{ - {model.AddressLabel: model.LabelValue(addr)}, - }, - }) - } - return cfg -} - -func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) { - t.Helper() - if _, ok := tSets[poolKey]; !ok { - t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets) - } - - match := false - var mergedTargets string - for _, targetGroup := range tSets[poolKey] { - for _, l := range targetGroup.Targets { - mergedTargets = mergedTargets + " " + l.String() - if l.String() == label { - match = true - } - } - } - if match != present { - msg := "" - if !present { - msg = "not" - } - t.Fatalf("%q should %s be present in Targets labels: %q", label, msg, mergedTargets) - } -} - -func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - reg := prometheus.NewRegistry() - _, sdMetrics := newTestMetrics(t, reg) - - discoveryManager := NewManager(ctx, log.NewNopLogger(), reg, sdMetrics) - require.NotNil(t, discoveryManager) - discoveryManager.updatert = 100 * time.Millisecond - go discoveryManager.Run() - - c := map[string]discovery.Configs{ - "prometheus": { - staticConfig("foo:9090", "bar:9090"), - }, - } - discoveryManager.ApplyConfig(c) - - <-discoveryManager.SyncCh() - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) - - c["prometheus"] = discovery.Configs{ - staticConfig("foo:9090"), - } - discoveryManager.ApplyConfig(c) - - <-discoveryManager.SyncCh() - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false) -} - -func TestDiscovererConfigs(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - reg := prometheus.NewRegistry() - _, sdMetrics := newTestMetrics(t, reg) - - discoveryManager := NewManager(ctx, log.NewNopLogger(), reg, sdMetrics) - require.NotNil(t, discoveryManager) - discoveryManager.updatert = 100 * time.Millisecond - go discoveryManager.Run() - - c := map[string]discovery.Configs{ - "prometheus": { - staticConfig("foo:9090", "bar:9090"), - staticConfig("baz:9090"), - }, - } - discoveryManager.ApplyConfig(c) - - <-discoveryManager.SyncCh() - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/1"}, "{__address__=\"baz:9090\"}", true) -} - -// TestTargetSetRecreatesEmptyStaticConfigs ensures that reloading a config file after -// removing all targets from the static_configs sends an update with empty targetGroups. -// This is required to signal the receiver that this target set has no current targets. -func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - reg := prometheus.NewRegistry() - _, sdMetrics := newTestMetrics(t, reg) - - discoveryManager := NewManager(ctx, log.NewNopLogger(), reg, sdMetrics) - require.NotNil(t, discoveryManager) - discoveryManager.updatert = 100 * time.Millisecond - go discoveryManager.Run() - - c := map[string]discovery.Configs{ - "prometheus": { - staticConfig("foo:9090"), - }, - } - discoveryManager.ApplyConfig(c) - - <-discoveryManager.SyncCh() - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - - c["prometheus"] = discovery.Configs{ - discovery.StaticConfig{{}}, - } - discoveryManager.ApplyConfig(c) - - <-discoveryManager.SyncCh() - - pkey := poolKey{setName: "prometheus", provider: "static/0"} - targetGroups, ok := discoveryManager.targets[pkey] - if !ok { - t.Fatalf("'%v' should be present in target groups", pkey) - } - group, ok := targetGroups[""] - if !ok { - t.Fatalf("missing '' key in target groups %v", targetGroups) - } - - if len(group.Targets) != 0 { - t.Fatalf("Invalid number of targets: expected 0, got %d", len(group.Targets)) - } -} - -func TestIdenticalConfigurationsAreCoalesced(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - reg := prometheus.NewRegistry() - _, sdMetrics := newTestMetrics(t, reg) - - discoveryManager := NewManager(ctx, nil, reg, sdMetrics) - require.NotNil(t, discoveryManager) - discoveryManager.updatert = 100 * time.Millisecond - go discoveryManager.Run() - - c := map[string]discovery.Configs{ - "prometheus": { - staticConfig("foo:9090"), - }, - "prometheus2": { - staticConfig("foo:9090"), - }, - } - discoveryManager.ApplyConfig(c) - - <-discoveryManager.SyncCh() - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus2", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) - if len(discoveryManager.providers) != 1 { - t.Fatalf("Invalid number of providers: expected 1, got %d", len(discoveryManager.providers)) - } -} - -func TestApplyConfigDoesNotModifyStaticTargets(t *testing.T) { - originalConfig := discovery.Configs{ - staticConfig("foo:9090", "bar:9090", "baz:9090"), - } - processedConfig := discovery.Configs{ - staticConfig("foo:9090", "bar:9090", "baz:9090"), - } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - reg := prometheus.NewRegistry() - _, sdMetrics := newTestMetrics(t, reg) - - discoveryManager := NewManager(ctx, log.NewNopLogger(), reg, sdMetrics) - require.NotNil(t, discoveryManager) - discoveryManager.updatert = 100 * time.Millisecond - go discoveryManager.Run() - - cfgs := map[string]discovery.Configs{ - "prometheus": processedConfig, - } - discoveryManager.ApplyConfig(cfgs) - <-discoveryManager.SyncCh() - - for _, cfg := range cfgs { - require.Equal(t, originalConfig, cfg) - } -} - -type errorConfig struct{ err error } - -func (e errorConfig) Name() string { return "error" } -func (e errorConfig) NewDiscoverer(discovery.DiscovererOptions) (discovery.Discoverer, error) { - return nil, e.err -} - -// NewDiscovererMetrics implements discovery.Config. -func (errorConfig) NewDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics { - return &discovery.NoopDiscovererMetrics{} -} - -func TestGaugeFailedConfigs(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - reg := prometheus.NewRegistry() - _, sdMetrics := newTestMetrics(t, reg) - - discoveryManager := NewManager(ctx, log.NewNopLogger(), reg, sdMetrics) - require.NotNil(t, discoveryManager) - discoveryManager.updatert = 100 * time.Millisecond - go discoveryManager.Run() - - c := map[string]discovery.Configs{ - "prometheus": { - errorConfig{fmt.Errorf("tests error 0")}, - errorConfig{fmt.Errorf("tests error 1")}, - errorConfig{fmt.Errorf("tests error 2")}, - }, - } - discoveryManager.ApplyConfig(c) - <-discoveryManager.SyncCh() - - failedCount := client_testutil.ToFloat64(discoveryManager.metrics.FailedConfigs) - if failedCount != 3 { - t.Fatalf("Expected to have 3 failed configs, got: %v", failedCount) - } - - c["prometheus"] = discovery.Configs{ - staticConfig("foo:9090"), - } - discoveryManager.ApplyConfig(c) - <-discoveryManager.SyncCh() - - failedCount = client_testutil.ToFloat64(discoveryManager.metrics.FailedConfigs) - if failedCount != 0 { - t.Fatalf("Expected to get no failed config, got: %v", failedCount) - } -} - -func TestCoordinationWithReceiver(t *testing.T) { - updateDelay := 100 * time.Millisecond - - type expect struct { - delay time.Duration - tgs map[string][]*targetgroup.Group - } - - testCases := []struct { - title string - providers map[string]discovery.Discoverer - expected []expect - }{ - { - title: "Receiver should get all updates even when one provider closes its channel", - providers: map[string]discovery.Discoverer{ - "once1": &onceProvider{ - tgs: []*targetgroup.Group{ - { - Source: "tg1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - }, - }, - "mock1": newMockDiscoveryProvider( - update{ - interval: 2 * updateDelay, - targetGroups: []targetgroup.Group{ - { - Source: "tg2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - }, - ), - }, - expected: []expect{ - { - tgs: map[string][]*targetgroup.Group{ - "once1": { - { - Source: "tg1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - }, - }, - }, - { - tgs: map[string][]*targetgroup.Group{ - "once1": { - { - Source: "tg1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - }, - "mock1": { - { - Source: "tg2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - }, - }, - }, - }, - { - title: "Receiver should get all updates even when the channel is blocked", - providers: map[string]discovery.Discoverer{ - "mock1": newMockDiscoveryProvider( - update{ - targetGroups: []targetgroup.Group{ - { - Source: "tg1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - }, - }, - update{ - interval: 4 * updateDelay, - targetGroups: []targetgroup.Group{ - { - Source: "tg2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - }, - ), - }, - expected: []expect{ - { - delay: 2 * updateDelay, - tgs: map[string][]*targetgroup.Group{ - "mock1": { - { - Source: "tg1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - }, - }, - }, - { - delay: 4 * updateDelay, - tgs: map[string][]*targetgroup.Group{ - "mock1": { - { - Source: "tg1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tg2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - }, - }, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.title, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - reg := prometheus.NewRegistry() - _, sdMetrics := newTestMetrics(t, reg) - - mgr := NewManager(ctx, nil, reg, sdMetrics) - require.NotNil(t, mgr) - mgr.updatert = updateDelay - go mgr.Run() - - for name, p := range tc.providers { - mgr.StartCustomProvider(ctx, name, p) - } - - for i, expected := range tc.expected { - time.Sleep(expected.delay) - select { - case <-ctx.Done(): - t.Fatalf("step %d: no update received in the expected timeframe", i) - case tgs, ok := <-mgr.SyncCh(): - if !ok { - t.Fatalf("step %d: discovery manager channel is closed", i) - } - if len(tgs) != len(expected.tgs) { - t.Fatalf("step %d: target groups mismatch, got: %d, expected: %d\ngot: %#v\nexpected: %#v", - i, len(tgs), len(expected.tgs), tgs, expected.tgs) - } - for k := range expected.tgs { - if _, ok := tgs[k]; !ok { - t.Fatalf("step %d: target group not found: %s\ngot: %#v", i, k, tgs) - } - assertEqualGroups(t, tgs[k], expected.tgs[k]) - } - } - } - }) - } -} - -type update struct { - targetGroups []targetgroup.Group - interval time.Duration -} - -type mockdiscoveryProvider struct { - updates []update -} - -func newMockDiscoveryProvider(updates ...update) mockdiscoveryProvider { - tp := mockdiscoveryProvider{ - updates: updates, - } - return tp -} - -func (tp mockdiscoveryProvider) Run(ctx context.Context, upCh chan<- []*targetgroup.Group) { - for _, u := range tp.updates { - if u.interval > 0 { - select { - case <-ctx.Done(): - return - case <-time.After(u.interval): - } - } - tgs := make([]*targetgroup.Group, len(u.targetGroups)) - for i := range u.targetGroups { - tgs[i] = &u.targetGroups[i] - } - upCh <- tgs - } - <-ctx.Done() -} - -// byGroupSource implements sort.Interface so we can sort by the Source field. -type byGroupSource []*targetgroup.Group - -func (a byGroupSource) Len() int { return len(a) } -func (a byGroupSource) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byGroupSource) Less(i, j int) bool { return a[i].Source < a[j].Source } - -// onceProvider sends updates once (if any) and closes the update channel. -type onceProvider struct { - tgs []*targetgroup.Group -} - -func (o onceProvider) Run(_ context.Context, ch chan<- []*targetgroup.Group) { - if len(o.tgs) > 0 { - ch <- o.tgs - } - close(ch) -} diff --git a/discovery/legacymanager/registry.go b/discovery/legacymanager/registry.go deleted file mode 100644 index 955705394..000000000 --- a/discovery/legacymanager/registry.go +++ /dev/null @@ -1,261 +0,0 @@ -// Copyright 2020 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package legacymanager - -import ( - "errors" - "fmt" - "reflect" - "sort" - "strconv" - "strings" - "sync" - - "gopkg.in/yaml.v2" - - "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/discovery/targetgroup" -) - -const ( - configFieldPrefix = "AUTO_DISCOVERY_" - staticConfigsKey = "static_configs" - staticConfigsFieldName = configFieldPrefix + staticConfigsKey -) - -var ( - configNames = make(map[string]discovery.Config) - configFieldNames = make(map[reflect.Type]string) - configFields []reflect.StructField - - configTypesMu sync.Mutex - configTypes = make(map[reflect.Type]reflect.Type) - - emptyStructType = reflect.TypeOf(struct{}{}) - configsType = reflect.TypeOf(discovery.Configs{}) -) - -// RegisterConfig registers the given Config type for YAML marshaling and unmarshaling. -func RegisterConfig(config discovery.Config) { - registerConfig(config.Name()+"_sd_configs", reflect.TypeOf(config), config) -} - -func init() { - // N.B.: static_configs is the only Config type implemented by default. - // All other types are registered at init by their implementing packages. - elemTyp := reflect.TypeOf(&targetgroup.Group{}) - registerConfig(staticConfigsKey, elemTyp, discovery.StaticConfig{}) -} - -func registerConfig(yamlKey string, elemType reflect.Type, config discovery.Config) { - name := config.Name() - if _, ok := configNames[name]; ok { - panic(fmt.Sprintf("discovery: Config named %q is already registered", name)) - } - configNames[name] = config - - fieldName := configFieldPrefix + yamlKey // Field must be exported. - configFieldNames[elemType] = fieldName - - // Insert fields in sorted order. - i := sort.Search(len(configFields), func(k int) bool { - return fieldName < configFields[k].Name - }) - configFields = append(configFields, reflect.StructField{}) // Add empty field at end. - copy(configFields[i+1:], configFields[i:]) // Shift fields to the right. - configFields[i] = reflect.StructField{ // Write new field in place. - Name: fieldName, - Type: reflect.SliceOf(elemType), - Tag: reflect.StructTag(`yaml:"` + yamlKey + `,omitempty"`), - } -} - -func getConfigType(out reflect.Type) reflect.Type { - configTypesMu.Lock() - defer configTypesMu.Unlock() - if typ, ok := configTypes[out]; ok { - return typ - } - // Initial exported fields map one-to-one. - var fields []reflect.StructField - for i, n := 0, out.NumField(); i < n; i++ { - switch field := out.Field(i); { - case field.PkgPath == "" && field.Type != configsType: - fields = append(fields, field) - default: - fields = append(fields, reflect.StructField{ - Name: "_" + field.Name, // Field must be unexported. - PkgPath: out.PkgPath(), - Type: emptyStructType, - }) - } - } - // Append extra config fields on the end. - fields = append(fields, configFields...) - typ := reflect.StructOf(fields) - configTypes[out] = typ - return typ -} - -// UnmarshalYAMLWithInlineConfigs helps implement yaml.Unmarshal for structs -// that have a Configs field that should be inlined. -func UnmarshalYAMLWithInlineConfigs(out interface{}, unmarshal func(interface{}) error) error { - outVal := reflect.ValueOf(out) - if outVal.Kind() != reflect.Ptr { - return fmt.Errorf("discovery: can only unmarshal into a struct pointer: %T", out) - } - outVal = outVal.Elem() - if outVal.Kind() != reflect.Struct { - return fmt.Errorf("discovery: can only unmarshal into a struct pointer: %T", out) - } - outTyp := outVal.Type() - - cfgTyp := getConfigType(outTyp) - cfgPtr := reflect.New(cfgTyp) - cfgVal := cfgPtr.Elem() - - // Copy shared fields (defaults) to dynamic value. - var configs *discovery.Configs - for i, n := 0, outVal.NumField(); i < n; i++ { - if outTyp.Field(i).Type == configsType { - configs = outVal.Field(i).Addr().Interface().(*discovery.Configs) - continue - } - if cfgTyp.Field(i).PkgPath != "" { - continue // Field is unexported: ignore. - } - cfgVal.Field(i).Set(outVal.Field(i)) - } - if configs == nil { - return fmt.Errorf("discovery: Configs field not found in type: %T", out) - } - - // Unmarshal into dynamic value. - if err := unmarshal(cfgPtr.Interface()); err != nil { - return replaceYAMLTypeError(err, cfgTyp, outTyp) - } - - // Copy shared fields from dynamic value. - for i, n := 0, outVal.NumField(); i < n; i++ { - if cfgTyp.Field(i).PkgPath != "" { - continue // Field is unexported: ignore. - } - outVal.Field(i).Set(cfgVal.Field(i)) - } - - var err error - *configs, err = readConfigs(cfgVal, outVal.NumField()) - return err -} - -func readConfigs(structVal reflect.Value, startField int) (discovery.Configs, error) { - var ( - configs discovery.Configs - targets []*targetgroup.Group - ) - for i, n := startField, structVal.NumField(); i < n; i++ { - field := structVal.Field(i) - if field.Kind() != reflect.Slice { - panic("discovery: internal error: field is not a slice") - } - for k := 0; k < field.Len(); k++ { - val := field.Index(k) - if val.IsZero() || (val.Kind() == reflect.Ptr && val.Elem().IsZero()) { - key := configFieldNames[field.Type().Elem()] - key = strings.TrimPrefix(key, configFieldPrefix) - return nil, fmt.Errorf("empty or null section in %s", key) - } - switch c := val.Interface().(type) { - case *targetgroup.Group: - // Add index to the static config target groups for unique identification - // within scrape pool. - c.Source = strconv.Itoa(len(targets)) - // Coalesce multiple static configs into a single static config. - targets = append(targets, c) - case discovery.Config: - configs = append(configs, c) - default: - panic("discovery: internal error: slice element is not a Config") - } - } - } - if len(targets) > 0 { - configs = append(configs, discovery.StaticConfig(targets)) - } - return configs, nil -} - -// MarshalYAMLWithInlineConfigs helps implement yaml.Marshal for structs -// that have a Configs field that should be inlined. -func MarshalYAMLWithInlineConfigs(in interface{}) (interface{}, error) { - inVal := reflect.ValueOf(in) - for inVal.Kind() == reflect.Ptr { - inVal = inVal.Elem() - } - inTyp := inVal.Type() - - cfgTyp := getConfigType(inTyp) - cfgPtr := reflect.New(cfgTyp) - cfgVal := cfgPtr.Elem() - - // Copy shared fields to dynamic value. - var configs *discovery.Configs - for i, n := 0, inTyp.NumField(); i < n; i++ { - if inTyp.Field(i).Type == configsType { - configs = inVal.Field(i).Addr().Interface().(*discovery.Configs) - } - if cfgTyp.Field(i).PkgPath != "" { - continue // Field is unexported: ignore. - } - cfgVal.Field(i).Set(inVal.Field(i)) - } - if configs == nil { - return nil, fmt.Errorf("discovery: Configs field not found in type: %T", in) - } - - if err := writeConfigs(cfgVal, *configs); err != nil { - return nil, err - } - - return cfgPtr.Interface(), nil -} - -func writeConfigs(structVal reflect.Value, configs discovery.Configs) error { - targets := structVal.FieldByName(staticConfigsFieldName).Addr().Interface().(*[]*targetgroup.Group) - for _, c := range configs { - if sc, ok := c.(discovery.StaticConfig); ok { - *targets = append(*targets, sc...) - continue - } - fieldName, ok := configFieldNames[reflect.TypeOf(c)] - if !ok { - return fmt.Errorf("discovery: cannot marshal unregistered Config type: %T", c) - } - field := structVal.FieldByName(fieldName) - field.Set(reflect.Append(field, reflect.ValueOf(c))) - } - return nil -} - -func replaceYAMLTypeError(err error, oldTyp, newTyp reflect.Type) error { - var e *yaml.TypeError - if errors.As(err, &e) { - oldStr := oldTyp.String() - newStr := newTyp.String() - for i, s := range e.Errors { - e.Errors[i] = strings.ReplaceAll(s, oldStr, newStr) - } - } - return err -} diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index d637312a3..f16fcbd7b 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -55,7 +55,7 @@ The Prometheus monitoring server | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | --query.max-concurrency | Maximum number of queries executed concurrently. Use with server mode only. | `20` | | --query.max-samples | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` | -| --enable-feature ... | Comma separated feature names to enable. Valid options: auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | +| --enable-feature ... | Comma separated feature names to enable. Valid options: auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | | --agent | Run Prometheus in 'Agent mode'. | | | --log.level | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` | | --log.format | Output format of log messages. One of: [logfmt, json] | `logfmt` | diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 1a8908548..f027ffce6 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -47,20 +47,6 @@ When enabled, for each instance scrape, Prometheus stores a sample in the follow to find out how close they are to reaching the limit with `scrape_samples_post_metric_relabeling / scrape_sample_limit`. Note that `scrape_sample_limit` can be zero if there is no limit configured, which means that the query above can return `+Inf` for targets with no limit (as we divide by zero). If you want to query only for targets that do have a sample limit use this query: `scrape_samples_post_metric_relabeling / (scrape_sample_limit > 0)`. - `scrape_body_size_bytes`. The uncompressed size of the most recent scrape response, if successful. Scrapes failing because `body_size_limit` is exceeded report `-1`, other scrape failures report `0`. -## New service discovery manager - -`--enable-feature=new-service-discovery-manager` - -When enabled, Prometheus uses a new service discovery manager that does not -restart unchanged discoveries upon reloading. This makes reloads faster and reduces -pressure on service discoveries' sources. - -Users are encouraged to test the new service discovery manager and report any -issues upstream. - -In future releases, this new service discovery manager will become the default and -this feature flag will be ignored. - ## Prometheus agent `--enable-feature=agent`