From e405e2f1ea2156f76a7e70f5471b88a590a8e24a Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sat, 25 Nov 2017 13:13:54 +0000 Subject: [PATCH] refactored discovery --- cmd/prometheus/main.go | 45 ++- config/config.go | 2 +- discovery/discovery.go | 319 ------------------ discovery/manager.go | 293 ++++++++++++++++ .../{discovery_test.go => manager_test.go} | 0 notifier/notifier.go | 16 - retrieval/manager.go | 152 +++++++++ ...{targetmanager_test.go => manager_test.go} | 0 retrieval/targetmanager.go | 194 ----------- web/web.go | 12 +- 10 files changed, 481 insertions(+), 552 deletions(-) delete mode 100644 discovery/discovery.go create mode 100644 discovery/manager.go rename discovery/{discovery_test.go => manager_test.go} (100%) create mode 100644 retrieval/manager.go rename retrieval/{targetmanager_test.go => manager_test.go} (100%) delete mode 100644 retrieval/targetmanager.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b03db5891..132605c36 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -43,6 +43,7 @@ import ( "github.com/prometheus/common/promlog" promlogflag "github.com/prometheus/common/promlog/flag" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/retrieval" @@ -230,12 +231,17 @@ func main() { cfg.queryEngine.Logger = log.With(logger, "component", "query engine") var ( - notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) - targetManager = retrieval.NewTargetManager(fanoutStorage, log.With(logger, "component", "target manager")) - queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) - ctx, cancelCtx = context.WithCancel(context.Background()) + notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) + ctxDiscovery, cancelDiscovery = context.WithCancel(context.Background()) + discoveryManager = discovery.NewDiscoveryManager(ctxDiscovery, log.With(logger, "component", "discovery manager")) + ctxScrape, cancelScrape = context.WithCancel(context.Background()) + scrapeManager = retrieval.NewScrapeManager(ctxScrape, log.With(logger, "component", "scrape manager"), fanoutStorage) + queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) + ctxWeb, cancelWeb = context.WithCancel(context.Background()) + webHandler = web.New(log.With(logger, "component", "web"), &cfg.web) ) + ctx := context.Background() ruleManager := rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, QueryFunc: rules.EngineQueryFunc(queryEngine), @@ -250,7 +256,7 @@ func main() { cfg.web.TSDB = localStorage.Get cfg.web.Storage = fanoutStorage cfg.web.QueryEngine = queryEngine - cfg.web.TargetManager = targetManager + cfg.web.ScrapeManager = scrapeManager cfg.web.RuleManager = ruleManager cfg.web.Notifier = notifier @@ -268,8 +274,6 @@ func main() { cfg.web.Flags[f.Name] = f.Value.String() } - webHandler := web.New(log.With(logger, "component", "web"), &cfg.web) - // Monitor outgoing connections on default transport with conntrack. http.DefaultTransport.(*http.Transport).DialContext = conntrack.NewDialContextFunc( conntrack.DialWithTracing(), @@ -306,6 +310,17 @@ func main() { var g group.Group { + g.Add( + func() error { + err := discoveryManager.Run() + level.Info(logger).Log("msg", "Discovery manager stopped") + return err + }, + func(err error) { + level.Info(logger).Log("msg", "Stopping discovery manager...") + cancelDiscovery() + }, + ) term := make(chan os.Signal) signal.Notify(term, os.Interrupt, syscall.SIGTERM) cancel := make(chan struct{}) @@ -426,7 +441,7 @@ func main() { { g.Add( func() error { - if err := webHandler.Run(ctx); err != nil { + if err := webHandler.Run(ctxWeb); err != nil { return fmt.Errorf("Error starting web server: %s", err) } return nil @@ -435,7 +450,7 @@ func main() { // Keep this interrupt before the ruleManager.Stop(). // Shutting down the query engine before the rule manager will cause pending queries // to be canceled and ensures a quick shutdown of the rule manager. - cancelCtx() + cancelWeb() }, ) } @@ -468,17 +483,15 @@ func main() { ) } { - // TODO(krasi) refactor targetManager.Run() to be blocking to avoid using an extra blocking channel. - cancel := make(chan struct{}) g.Add( func() error { - targetManager.Run() - <-cancel - return nil + err := scrapeManager.Run(discoveryManager.SyncCh()) + level.Info(logger).Log("msg", "Scrape manager stopped") + return err }, func(err error) { - targetManager.Stop() - close(cancel) + level.Info(logger).Log("msg", "Stopping scrape manager...") + cancelScrape() }, ) } diff --git a/config/config.go b/config/config.go index ab4208008..73a938bca 100644 --- a/config/config.go +++ b/config/config.go @@ -721,7 +721,7 @@ func (a *BasicAuth) UnmarshalYAML(unmarshal func(interface{}) error) error { return checkOverflow(a.XXX, "basic_auth") } -// TargetGroup is a set of targets with a common label set. +// TargetGroup is a set of targets with a common label set(production , test, staging etc.). type TargetGroup struct { // Targets is a list of targets identified by a label set. Each target is // uniquely identifiable in the group by its address label. diff --git a/discovery/discovery.go b/discovery/discovery.go deleted file mode 100644 index 9cf5f8190..000000000 --- a/discovery/discovery.go +++ /dev/null @@ -1,319 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package discovery - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery/azure" - "github.com/prometheus/prometheus/discovery/consul" - "github.com/prometheus/prometheus/discovery/dns" - "github.com/prometheus/prometheus/discovery/ec2" - "github.com/prometheus/prometheus/discovery/file" - "github.com/prometheus/prometheus/discovery/gce" - "github.com/prometheus/prometheus/discovery/kubernetes" - "github.com/prometheus/prometheus/discovery/marathon" - "github.com/prometheus/prometheus/discovery/openstack" - "github.com/prometheus/prometheus/discovery/triton" - "github.com/prometheus/prometheus/discovery/zookeeper" -) - -// A TargetProvider provides information about target groups. It maintains a set -// of sources from which TargetGroups can originate. Whenever a target provider -// detects a potential change, it sends the TargetGroup through its provided channel. -// -// The TargetProvider does not have to guarantee that an actual change happened. -// It does guarantee that it sends the new TargetGroup whenever a change happens. -// -// TargetProviders should initially send a full set of all discoverable TargetGroups. -type TargetProvider interface { - // Run hands a channel to the target provider through which it can send - // updated target groups. - // Must returns if the context gets canceled. It should not close the update - // channel on returning. - Run(ctx context.Context, up chan<- []*config.TargetGroup) -} - -// ProvidersFromConfig returns all TargetProviders configured in cfg. -func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig, logger log.Logger) map[string]TargetProvider { - providers := map[string]TargetProvider{} - - app := func(mech string, i int, tp TargetProvider) { - providers[fmt.Sprintf("%s/%d", mech, i)] = tp - } - - for i, c := range cfg.DNSSDConfigs { - app("dns", i, dns.NewDiscovery(c, log.With(logger, "discovery", "dns"))) - } - for i, c := range cfg.FileSDConfigs { - app("file", i, file.NewDiscovery(c, log.With(logger, "discovery", "file"))) - } - for i, c := range cfg.ConsulSDConfigs { - k, err := consul.NewDiscovery(c, log.With(logger, "discovery", "consul")) - if err != nil { - level.Error(logger).Log("msg", "Cannot create Consul discovery", "err", err) - continue - } - app("consul", i, k) - } - for i, c := range cfg.MarathonSDConfigs { - m, err := marathon.NewDiscovery(c, log.With(logger, "discovery", "marathon")) - if err != nil { - level.Error(logger).Log("msg", "Cannot create Marathon discovery", "err", err) - continue - } - app("marathon", i, m) - } - for i, c := range cfg.KubernetesSDConfigs { - k, err := kubernetes.New(log.With(logger, "discovery", "k8s"), c) - if err != nil { - level.Error(logger).Log("msg", "Cannot create Kubernetes discovery", "err", err) - continue - } - app("kubernetes", i, k) - } - for i, c := range cfg.ServersetSDConfigs { - app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(logger, "discovery", "zookeeper"))) - } - for i, c := range cfg.NerveSDConfigs { - app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(logger, "discovery", "nerve"))) - } - for i, c := range cfg.EC2SDConfigs { - app("ec2", i, ec2.NewDiscovery(c, log.With(logger, "discovery", "ec2"))) - } - for i, c := range cfg.OpenstackSDConfigs { - openstackd, err := openstack.NewDiscovery(c, log.With(logger, "discovery", "openstack")) - if err != nil { - level.Error(logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err) - continue - } - app("openstack", i, openstackd) - } - - for i, c := range cfg.GCESDConfigs { - gced, err := gce.NewDiscovery(c, log.With(logger, "discovery", "gce")) - if err != nil { - level.Error(logger).Log("msg", "Cannot initialize GCE discovery", "err", err) - continue - } - app("gce", i, gced) - } - for i, c := range cfg.AzureSDConfigs { - app("azure", i, azure.NewDiscovery(c, log.With(logger, "discovery", "azure"))) - } - for i, c := range cfg.TritonSDConfigs { - t, err := triton.New(log.With(logger, "discovery", "trition"), c) - if err != nil { - level.Error(logger).Log("msg", "Cannot create Triton discovery", "err", err) - continue - } - app("triton", i, t) - } - if len(cfg.StaticConfigs) > 0 { - app("static", 0, NewStaticProvider(cfg.StaticConfigs)) - } - - return providers -} - -// StaticProvider holds a list of target groups that never change. -type StaticProvider struct { - TargetGroups []*config.TargetGroup -} - -// NewStaticProvider returns a StaticProvider configured with the given -// target groups. -func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { - for i, tg := range groups { - tg.Source = fmt.Sprintf("%d", i) - } - return &StaticProvider{groups} -} - -// Run implements the TargetProvider interface. -func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - // We still have to consider that the consumer exits right away in which case - // the context will be canceled. - select { - case ch <- sd.TargetGroups: - case <-ctx.Done(): - } - close(ch) -} - -// TargetSet handles multiple TargetProviders and sends a full overview of their -// discovered TargetGroups to a Syncer. -type TargetSet struct { - mtx sync.RWMutex - // Sets of targets by a source string that is unique across target providers. - tgroups map[string]*config.TargetGroup - - syncer Syncer - - syncCh chan struct{} - providerCh chan map[string]TargetProvider - cancelProviders func() -} - -// Syncer receives updates complete sets of TargetGroups. -type Syncer interface { - Sync([]*config.TargetGroup) -} - -// NewTargetSet returns a new target sending TargetGroups to the Syncer. -func NewTargetSet(s Syncer) *TargetSet { - return &TargetSet{ - syncCh: make(chan struct{}, 1), - providerCh: make(chan map[string]TargetProvider), - syncer: s, - } -} - -// Run starts the processing of target providers and their updates. -// It blocks until the context gets canceled. -func (ts *TargetSet) Run(ctx context.Context) { -Loop: - for { - // Throttle syncing to once per five seconds. - select { - case <-ctx.Done(): - break Loop - case p := <-ts.providerCh: - ts.updateProviders(ctx, p) - case <-time.After(5 * time.Second): - } - - select { - case <-ctx.Done(): - break Loop - case <-ts.syncCh: - ts.sync() - case p := <-ts.providerCh: - ts.updateProviders(ctx, p) - } - } -} - -func (ts *TargetSet) sync() { - ts.mtx.RLock() - var all []*config.TargetGroup - for _, tg := range ts.tgroups { - all = append(all, tg) - } - ts.mtx.RUnlock() - - ts.syncer.Sync(all) -} - -// UpdateProviders sets new target providers for the target set. -func (ts *TargetSet) UpdateProviders(p map[string]TargetProvider) { - ts.providerCh <- p -} - -func (ts *TargetSet) updateProviders(ctx context.Context, providers map[string]TargetProvider) { - - // Stop all previous target providers of the target set. - if ts.cancelProviders != nil { - ts.cancelProviders() - } - ctx, ts.cancelProviders = context.WithCancel(ctx) - - var wg sync.WaitGroup - // (Re-)create a fresh tgroups map to not keep stale targets around. We - // will retrieve all targets below anyway, so cleaning up everything is - // safe and doesn't inflict any additional cost. - ts.mtx.Lock() - ts.tgroups = map[string]*config.TargetGroup{} - ts.mtx.Unlock() - - for name, prov := range providers { - wg.Add(1) - - updates := make(chan []*config.TargetGroup) - go prov.Run(ctx, updates) - - go func(name string, prov TargetProvider) { - select { - case <-ctx.Done(): - case initial, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - break - } - // First set of all targets the provider knows. - for _, tgroup := range initial { - ts.setTargetGroup(name, tgroup) - } - case <-time.After(5 * time.Second): - // Initial set didn't arrive. Act as if it was empty - // and wait for updates later on. - } - wg.Done() - - // Start listening for further updates. - for { - select { - case <-ctx.Done(): - return - case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - return - } - for _, tg := range tgs { - ts.update(name, tg) - } - } - } - }(name, prov) - } - - // We wait for a full initial set of target groups before releasing the mutex - // to ensure the initial sync is complete and there are no races with subsequent updates. - wg.Wait() - // Just signal that there are initial sets to sync now. Actual syncing must only - // happen in the runScraping loop. - select { - case ts.syncCh <- struct{}{}: - default: - } -} - -// update handles a target group update from a target provider identified by the name. -func (ts *TargetSet) update(name string, tgroup *config.TargetGroup) { - ts.setTargetGroup(name, tgroup) - - select { - case ts.syncCh <- struct{}{}: - default: - } -} - -func (ts *TargetSet) setTargetGroup(name string, tg *config.TargetGroup) { - ts.mtx.Lock() - defer ts.mtx.Unlock() - - if tg == nil { - return - } - ts.tgroups[name+"/"+tg.Source] = tg -} diff --git a/discovery/manager.go b/discovery/manager.go new file mode 100644 index 000000000..7119c5470 --- /dev/null +++ b/discovery/manager.go @@ -0,0 +1,293 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "context" + "fmt" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + + "github.com/prometheus/prometheus/config" + + "github.com/prometheus/prometheus/discovery/azure" + "github.com/prometheus/prometheus/discovery/consul" + "github.com/prometheus/prometheus/discovery/dns" + "github.com/prometheus/prometheus/discovery/ec2" + "github.com/prometheus/prometheus/discovery/file" + "github.com/prometheus/prometheus/discovery/gce" + "github.com/prometheus/prometheus/discovery/kubernetes" + "github.com/prometheus/prometheus/discovery/marathon" + "github.com/prometheus/prometheus/discovery/openstack" + "github.com/prometheus/prometheus/discovery/triton" + "github.com/prometheus/prometheus/discovery/zookeeper" +) + +// DiscoveryProvider provides information about target groups. It maintains a set +// of sources from which TargetGroups can originate. Whenever a discovery provider +// detects a potential change, it sends the TargetGroup through its provided channel. +// +// The DiscoveryProvider does not have to guarantee that an actual change happened. +// It does guarantee that it sends the new TargetGroup whenever a change happens. +// +// DiscoveryProviders should initially send a full set of all discoverable TargetGroups. +type DiscoveryProvider interface { + // Run hands a channel to the discovery provider(consul,dns etc) through which it can send + // updated target groups. + // Must returns if the context gets canceled. It should not close the update + // channel on returning. + Run(ctx context.Context, up chan<- []*config.TargetGroup) +} + +type targetSetProvider struct { + cancel func() + tgroups []*config.TargetGroup +} + +// NewDiscoveryManager is the DiscoveryManager constructor +func NewDiscoveryManager(ctx context.Context, logger log.Logger) *DiscoveryManager { + return &DiscoveryManager{ + ctx: ctx, + logger: logger, + actionCh: make(chan func()), + syncCh: make(chan map[string][]*config.TargetGroup), + targetSetProviders: make(map[string]map[string]*targetSetProvider), + } +} + +// DiscoveryManager maintains a set of discovery providers and sends each update to a channel used by other packages. +type DiscoveryManager struct { + ctx context.Context + logger log.Logger + syncCh chan map[string][]*config.TargetGroup // map[targetSetName] + actionCh chan func() + targetSetProviders map[string]map[string]*targetSetProvider // map[targetSetName]map[providerName] +} + +// Run starts the background processing +func (m *DiscoveryManager) Run() error { + for { + select { + case f := <-m.actionCh: + f() + case <-m.ctx.Done(): + return m.ctx.Err() + } + } + +} + +// SyncCh returns a read only channel used by all DiscoveryProviders targetSet updates +func (m *DiscoveryManager) SyncCh() <-chan map[string][]*config.TargetGroup { + return m.syncCh +} + +// ApplyConfig removes all running discovery providers and starts new ones using the provided config. +func (m *DiscoveryManager) ApplyConfig(cfg *config.Config) error { + err := make(chan error) + m.actionCh <- func() { + m.cancelDiscoveryProviders() + for _, scfg := range cfg.ScrapeConfigs { + for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { + ctx, cancel := context.WithCancel(m.ctx) + updates := make(chan []*config.TargetGroup) + + m.createProvider(cancel, scfg.JobName, provName) + + go prov.Run(ctx, updates) + go func(provName string) { + select { + case <-ctx.Done(): + // First set of all targets the provider knows. + case tgs, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + break + } + m.syncCh <- m.mergeGroups(scfg.JobName, provName, tgs) + case <-time.After(5 * time.Second): + // Initial set didn't arrive. Act as if it was empty + // and wait for updates later on. + } + + // Start listening for further updates. + for { + select { + case <-ctx.Done(): + return + case tgs, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + return + } + m.syncCh <- m.mergeGroups(scfg.JobName, provName, tgs) + } + } + }(provName) + } + } + close(err) + } + + return <-err +} + +func (m *DiscoveryManager) cancelDiscoveryProviders() { + for targetSetName, targetSetProviders := range m.targetSetProviders { + for _, discoveryProvider := range targetSetProviders { + discoveryProvider.cancel() + } + delete(m.targetSetProviders, targetSetName) + } +} + +func (m *DiscoveryManager) createProvider(cancel context.CancelFunc, tsName, provName string) { + if m.targetSetProviders[tsName] == nil { + m.targetSetProviders[tsName] = make(map[string]*targetSetProvider) + } + m.targetSetProviders[tsName][provName] = &targetSetProvider{ + cancel: cancel, + tgroups: []*config.TargetGroup{}, + } +} + +// mergeGroups adds a new target group for a named discovery provider and returns all target groups for a given target set +func (m *DiscoveryManager) mergeGroups(tsName, provName string, tg []*config.TargetGroup) map[string][]*config.TargetGroup { + tset := make(chan map[string][]*config.TargetGroup) + m.actionCh <- func() { + if tg != nil { + m.targetSetProviders[tsName][provName].tgroups = tg + } + var tgAll []*config.TargetGroup + for _, prov := range m.targetSetProviders[tsName] { + for _, tg := range prov.tgroups { + tgAll = append(tgAll, tg) + } + } + t := make(map[string][]*config.TargetGroup) + t[tsName] = tgAll + tset <- t + } + return <-tset +} + +func (m *DiscoveryManager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]DiscoveryProvider { + providers := map[string]DiscoveryProvider{} + + app := func(mech string, i int, tp DiscoveryProvider) { + providers[fmt.Sprintf("%s/%d", mech, i)] = tp + } + + for i, c := range cfg.DNSSDConfigs { + app("dns", i, dns.NewDiscovery(c, log.With(m.logger, "discovery", "dns"))) + } + for i, c := range cfg.FileSDConfigs { + app("file", i, file.NewDiscovery(c, log.With(m.logger, "discovery", "file"))) + } + for i, c := range cfg.ConsulSDConfigs { + k, err := consul.NewDiscovery(c, log.With(m.logger, "discovery", "consul")) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create Consul discovery", "err", err) + continue + } + app("consul", i, k) + } + for i, c := range cfg.MarathonSDConfigs { + t, err := marathon.NewDiscovery(c, log.With(m.logger, "discovery", "marathon")) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create Marathon discovery", "err", err) + continue + } + app("marathon", i, t) + } + for i, c := range cfg.KubernetesSDConfigs { + k, err := kubernetes.New(log.With(m.logger, "discovery", "k8s"), c) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create Kubernetes discovery", "err", err) + continue + } + app("kubernetes", i, k) + } + for i, c := range cfg.ServersetSDConfigs { + app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(m.logger, "discovery", "zookeeper"))) + } + for i, c := range cfg.NerveSDConfigs { + app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(m.logger, "discovery", "nerve"))) + } + for i, c := range cfg.EC2SDConfigs { + app("ec2", i, ec2.NewDiscovery(c, log.With(m.logger, "discovery", "ec2"))) + } + for i, c := range cfg.OpenstackSDConfigs { + openstackd, err := openstack.NewDiscovery(c, log.With(m.logger, "discovery", "openstack")) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err) + continue + } + app("openstack", i, openstackd) + } + + for i, c := range cfg.GCESDConfigs { + gced, err := gce.NewDiscovery(c, log.With(m.logger, "discovery", "gce")) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot initialize GCE discovery", "err", err) + continue + } + app("gce", i, gced) + } + for i, c := range cfg.AzureSDConfigs { + app("azure", i, azure.NewDiscovery(c, log.With(m.logger, "discovery", "azure"))) + } + for i, c := range cfg.TritonSDConfigs { + t, err := triton.New(log.With(m.logger, "discovery", "trition"), c) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create Triton discovery", "err", err) + continue + } + app("triton", i, t) + } + if len(cfg.StaticConfigs) > 0 { + app("static", 0, NewStaticProvider(cfg.StaticConfigs)) + } + + return providers +} + +// StaticProvider holds a list of target groups that never change. +type StaticProvider struct { + TargetGroups []*config.TargetGroup +} + +// NewStaticProvider returns a StaticProvider configured with the given +// target groups. +func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { + for i, tg := range groups { + tg.Source = fmt.Sprintf("%d", i) + } + return &StaticProvider{groups} +} + +// Run implements the DiscoveryProvider interface. +func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + // We still have to consider that the consumer exits right away in which case + // the context will be canceled. + select { + case ch <- sd.TargetGroups: + case <-ctx.Done(): + } + close(ch) +} diff --git a/discovery/discovery_test.go b/discovery/manager_test.go similarity index 100% rename from discovery/discovery_test.go rename to discovery/manager_test.go diff --git a/notifier/notifier.go b/notifier/notifier.go index 36e63fd6e..d2c14b765 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -35,7 +35,6 @@ import ( "golang.org/x/net/context/ctxhttp" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/util/httputil" @@ -248,7 +247,6 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs amSets := []*alertmanagerSet{} - ctx, cancel := context.WithCancel(n.ctx) for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs { ams, err := newAlertmanagerSet(cfg, n.logger) @@ -261,17 +259,6 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { amSets = append(amSets, ams) } - // After all sets were created successfully, start them and cancel the - // old ones. - for _, ams := range amSets { - go ams.ts.Run(ctx) - ams.ts.UpdateProviders(discovery.ProvidersFromConfig(ams.cfg.ServiceDiscoveryConfig, n.logger)) - } - if n.cancelDiscovery != nil { - n.cancelDiscovery() - } - - n.cancelDiscovery = cancel n.alertmanagers = amSets return nil @@ -504,7 +491,6 @@ func (a alertmanagerLabels) url() *url.URL { // alertmanagerSet contains a set of Alertmanagers discovered via a group of service // discovery definitions that have a common configuration on how alerts should be sent. type alertmanagerSet struct { - ts *discovery.TargetSet cfg *config.AlertmanagerConfig client *http.Client @@ -525,8 +511,6 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger) (*ale cfg: cfg, logger: logger, } - s.ts = discovery.NewTargetSet(s) - return s, nil } diff --git a/retrieval/manager.go b/retrieval/manager.go new file mode 100644 index 000000000..694d14b97 --- /dev/null +++ b/retrieval/manager.go @@ -0,0 +1,152 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retrieval + +import ( + "context" + "fmt" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/storage" +) + +// Appendable returns an Appender. +type Appendable interface { + Appender() (storage.Appender, error) +} + +// NewScrapeManager is the ScrapeManager constructor +func NewScrapeManager(ctx context.Context, logger log.Logger, app Appendable) *ScrapeManager { + + return &ScrapeManager{ + ctx: ctx, + append: app, + logger: logger, + actionCh: make(chan func()), + scrapeConfigs: make(map[string]*config.ScrapeConfig), + scrapePools: make(map[string]*scrapePool), + } +} + +// ScrapeManager maintains a set of scrape pools and manages start/stop cicles +// when receiving new target groups form the discovery manager. +type ScrapeManager struct { + ctx context.Context + logger log.Logger + append Appendable + scrapeConfigs map[string]*config.ScrapeConfig + scrapePools map[string]*scrapePool + actionCh chan func() +} + +// Run starts background processing to handle target updates and reload the scraping loops. +func (m *ScrapeManager) Run(tsets <-chan map[string][]*config.TargetGroup) error { + level.Info(m.logger).Log("msg", "Starting scrape manager...") + + for { + select { + case f := <-m.actionCh: + f() + case ts := <-tsets: + m.reload(ts) + case <-m.ctx.Done(): + return m.ctx.Err() + } + } +} + +// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. +func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { + done := make(chan struct{}) + m.actionCh <- func() { + for _, scfg := range cfg.ScrapeConfigs { + m.scrapeConfigs[scfg.JobName] = scfg + } + close(done) + } + <-done + return nil +} + +// TargetMap returns map of active and dropped targets and their corresponding scrape config job name. +func (tm *TargetManager) TargetMap() map[string][]*Target { + tm.mtx.RLock() + defer tm.mtx.RUnlock() + + targetsMap := make(map[string][]*Target) + for jobName, ps := range tm.targetSets { + ps.sp.mtx.RLock() + for _, t := range ps.sp.targets { + targetsMap[jobName] = append(targetsMap[jobName], t) + } + targetsMap[jobName] = append(targetsMap[jobName], ps.sp.droppedTargets...) + ps.sp.mtx.RUnlock() + } + return targetsMap +} + +// Targets returns the targets currently being scraped. +func (m *ScrapeManager) Targets() []*Target { + targets := make(chan []*Target) + m.actionCh <- func() { + var t []*Target + for _, p := range m.scrapePools { + p.mtx.RLock() + for _, tt := range p.targets { + t = append(t, tt) + } + p.mtx.RUnlock() + } + targets <- t + } + return <-targets +} + +func (m *ScrapeManager) reload(t map[string][]*config.TargetGroup) error { + for tsetName, tgroup := range t { + scrapeConfig, ok := m.scrapeConfigs[tsetName] + if !ok { + return fmt.Errorf("target set '%v' doesn't have valid config", tsetName) + } + + // scrape pool doesn't exist so start a new one + existing, ok := m.scrapePools[tsetName] + if !ok { + sp := newScrapePool(m.ctx, scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName)) + m.scrapePools[tsetName] = sp + sp.Sync(tgroup) + + } else { + existing.Sync(tgroup) + } + + // cleanup - check config and cancel the scrape loops if it don't exist in the scrape config + jobs := make(map[string]struct{}) + + for k := range m.scrapeConfigs { + jobs[k] = struct{}{} + } + + for name, sp := range m.scrapePools { + if _, ok := jobs[name]; !ok { + sp.stop() + delete(m.scrapePools, name) + } + } + } + return nil +} diff --git a/retrieval/targetmanager_test.go b/retrieval/manager_test.go similarity index 100% rename from retrieval/targetmanager_test.go rename to retrieval/manager_test.go diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go deleted file mode 100644 index f82640664..000000000 --- a/retrieval/targetmanager.go +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package retrieval - -import ( - "context" - "sync" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/storage" -) - -// TargetManager maintains a set of targets, starts and stops their scraping and -// creates the new targets based on the target groups it receives from various -// target providers. -type TargetManager struct { - append Appendable - scrapeConfigs []*config.ScrapeConfig - - mtx sync.RWMutex - ctx context.Context - cancel func() - wg sync.WaitGroup - - // Set of unqiue targets by scrape configuration. - targetSets map[string]*targetSet - logger log.Logger - starting chan struct{} -} - -type targetSet struct { - ctx context.Context - cancel func() - - ts *discovery.TargetSet - sp *scrapePool -} - -// Appendable returns an Appender. -type Appendable interface { - Appender() (storage.Appender, error) -} - -// NewTargetManager creates a new TargetManager. -func NewTargetManager(app Appendable, logger log.Logger) *TargetManager { - return &TargetManager{ - append: app, - targetSets: map[string]*targetSet{}, - logger: logger, - starting: make(chan struct{}), - } -} - -// Run starts background processing to handle target updates. -func (tm *TargetManager) Run() { - level.Info(tm.logger).Log("msg", "Starting target manager...") - - tm.mtx.Lock() - - tm.ctx, tm.cancel = context.WithCancel(context.Background()) - tm.reload() - - tm.mtx.Unlock() - close(tm.starting) - - tm.wg.Wait() -} - -// Stop all background processing. -func (tm *TargetManager) Stop() { - <-tm.starting - level.Info(tm.logger).Log("msg", "Stopping target manager...") - - tm.mtx.Lock() - // Cancel the base context, this will cause all target providers to shut down - // and all in-flight scrapes to abort immmediately. - // Started inserts will be finished before terminating. - tm.cancel() - tm.mtx.Unlock() - - // Wait for all scrape inserts to complete. - tm.wg.Wait() - - level.Info(tm.logger).Log("msg", "Target manager stopped") -} - -func (tm *TargetManager) reload() { - jobs := map[string]struct{}{} - - // Start new target sets and update existing ones. - for _, scfg := range tm.scrapeConfigs { - jobs[scfg.JobName] = struct{}{} - - ts, ok := tm.targetSets[scfg.JobName] - if !ok { - ctx, cancel := context.WithCancel(tm.ctx) - ts = &targetSet{ - ctx: ctx, - cancel: cancel, - sp: newScrapePool(ctx, scfg, tm.append, log.With(tm.logger, "scrape_pool", scfg.JobName)), - } - ts.ts = discovery.NewTargetSet(ts.sp) - - tm.targetSets[scfg.JobName] = ts - - tm.wg.Add(1) - - go func(ts *targetSet) { - // Run target set, which blocks until its context is canceled. - // Gracefully shut down pending scrapes in the scrape pool afterwards. - ts.ts.Run(ctx) - ts.sp.stop() - tm.wg.Done() - }(ts) - } else { - ts.sp.reload(scfg) - } - ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig, tm.logger)) - } - - // Remove old target sets. Waiting for scrape pools to complete pending - // scrape inserts is already guaranteed by the goroutine that started the target set. - for name, ts := range tm.targetSets { - if _, ok := jobs[name]; !ok { - ts.cancel() - delete(tm.targetSets, name) - } - } -} - -// TargetMap returns map of active and dropped targets and their corresponding scrape config job name. -func (tm *TargetManager) TargetMap() map[string][]*Target { - tm.mtx.RLock() - defer tm.mtx.RUnlock() - - targetsMap := make(map[string][]*Target) - for jobName, ps := range tm.targetSets { - ps.sp.mtx.RLock() - for _, t := range ps.sp.targets { - targetsMap[jobName] = append(targetsMap[jobName], t) - } - targetsMap[jobName] = append(targetsMap[jobName], ps.sp.droppedTargets...) - ps.sp.mtx.RUnlock() - } - return targetsMap -} - -// Targets returns the targets currently being scraped. -func (tm *TargetManager) Targets() []*Target { - tm.mtx.RLock() - defer tm.mtx.RUnlock() - - targets := []*Target{} - for _, ps := range tm.targetSets { - ps.sp.mtx.RLock() - - for _, t := range ps.sp.targets { - targets = append(targets, t) - } - - ps.sp.mtx.RUnlock() - } - - return targets -} - -// ApplyConfig resets the manager's target providers and job configurations as defined -// by the new cfg. The state of targets that are valid in the new configuration remains unchanged. -func (tm *TargetManager) ApplyConfig(cfg *config.Config) error { - tm.mtx.Lock() - defer tm.mtx.Unlock() - - tm.scrapeConfigs = cfg.ScrapeConfigs - - if tm.ctx != nil { - tm.reload() - } - return nil -} diff --git a/web/web.go b/web/web.go index d9875150e..6dfe67a39 100644 --- a/web/web.go +++ b/web/web.go @@ -71,7 +71,7 @@ var localhostRepresentations = []string{"127.0.0.1", "localhost"} type Handler struct { logger log.Logger - targetManager *retrieval.TargetManager + scrapeManager *retrieval.ScrapeManager ruleManager *rules.Manager queryEngine *promql.Engine context context.Context @@ -125,7 +125,7 @@ type Options struct { TSDB func() *tsdb.DB Storage storage.Storage QueryEngine *promql.Engine - TargetManager *retrieval.TargetManager + ScrapeManager *retrieval.ScrapeManager RuleManager *rules.Manager Notifier *notifier.Notifier Version *PrometheusVersion @@ -169,7 +169,7 @@ func New(logger log.Logger, o *Options) *Handler { flagsMap: o.Flags, context: o.Context, - targetManager: o.TargetManager, + scrapeManager: o.ScrapeManager, ruleManager: o.RuleManager, queryEngine: o.QueryEngine, tsdb: o.TSDB, @@ -181,7 +181,7 @@ func New(logger log.Logger, o *Options) *Handler { ready: 0, } - h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, h.targetManager, h.notifier, + h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, h.scrapeManager, h.notifier, func() config.Config { h.mtx.RLock() defer h.mtx.RUnlock() @@ -405,7 +405,7 @@ func (h *Handler) Run(ctx context.Context) error { h.options.QueryEngine, h.options.Storage.Querier, func() []*retrieval.Target { - return h.options.TargetManager.Targets() + return h.options.ScrapeManager.Targets() }, func() []*url.URL { return h.options.Notifier.Alertmanagers() @@ -605,7 +605,7 @@ func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) { func (h *Handler) targets(w http.ResponseWriter, r *http.Request) { // Bucket targets by job label tps := map[string][]*retrieval.Target{} - for _, t := range h.targetManager.Targets() { + for _, t := range h.scrapeManager.Targets() { job := t.Labels().Get(model.JobLabel) tps[job] = append(tps[job], t) }