From 3f9a9d1e62f1491ddf6b10a18b57dbcaa3e46ddc Mon Sep 17 00:00:00 2001 From: Sebastian Poxhofer Date: Mon, 13 Jun 2022 21:06:15 +0200 Subject: [PATCH] chore(discoveryManager): expose Discoverer refresh function (#10531) Signed-off-by: secustor --- discovery/http/http.go | 4 ++-- discovery/http/http_test.go | 8 ++++---- discovery/manager.go | 34 ++++++++++++++++++++++++---------- 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/discovery/http/http.go b/discovery/http/http.go index b09146183..ec958c614 100644 --- a/discovery/http/http.go +++ b/discovery/http/http.go @@ -136,12 +136,12 @@ func NewDiscovery(conf *SDConfig, logger log.Logger, clientOpts []config.HTTPCli logger, "http", time.Duration(conf.RefreshInterval), - d.refresh, + d.Refresh, ) return d, nil } -func (d *Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { +func (d *Discovery) Refresh(ctx context.Context) ([]*targetgroup.Group, error) { req, err := http.NewRequest("GET", d.url, nil) if err != nil { return nil, err diff --git a/discovery/http/http_test.go b/discovery/http/http_test.go index e79f27a7a..a284e7f36 100644 --- a/discovery/http/http_test.go +++ b/discovery/http/http_test.go @@ -45,7 +45,7 @@ func TestHTTPValidRefresh(t *testing.T) { require.NoError(t, err) ctx := context.Background() - tgs, err := d.refresh(ctx) + tgs, err := d.Refresh(ctx) require.NoError(t, err) expectedTargets := []*targetgroup.Group{ @@ -83,7 +83,7 @@ func TestHTTPInvalidCode(t *testing.T) { require.NoError(t, err) ctx := context.Background() - _, err = d.refresh(ctx) + _, err = d.Refresh(ctx) require.EqualError(t, err, "server returned HTTP status 400 Bad Request") require.Equal(t, 1.0, getFailureCount()) } @@ -105,7 +105,7 @@ func TestHTTPInvalidFormat(t *testing.T) { require.NoError(t, err) ctx := context.Background() - _, err = d.refresh(ctx) + _, err = d.Refresh(ctx) require.EqualError(t, err, `unsupported content type "text/plain; charset=utf-8"`) require.Equal(t, 1.0, getFailureCount()) } @@ -423,7 +423,7 @@ func TestSourceDisappeared(t *testing.T) { ctx := context.Background() for i, res := range test.responses { stubResponse = res - tgs, err := d.refresh(ctx) + tgs, err := d.Refresh(ctx) require.NoError(t, err) require.Equal(t, test.expectedTargets[i], tgs) } diff --git a/discovery/manager.go b/discovery/manager.go index 088b9722c..b7357fa6c 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -75,8 +75,8 @@ type poolKey struct { provider string } -// provider holds a Discoverer instance, its configuration, cancel func and its subscribers. -type provider struct { +// Provider holds a Discoverer instance, its configuration, cancel func and its subscribers. +type Provider struct { name string d Discoverer config interface{} @@ -92,11 +92,20 @@ type provider struct { newSubs map[string]struct{} } +// Discoverer return the Discoverer of the provider +func (p *Provider) Discoverer() Discoverer { + return p.d +} + // IsStarted return true if Discoverer is started. -func (p *provider) IsStarted() bool { +func (p *Provider) IsStarted() bool { return p.cancel != nil } +func (p *Provider) Config() interface{} { + return p.config +} + // NewManager is the Discovery Manager constructor. func NewManager(ctx context.Context, logger log.Logger, options ...func(*Manager)) *Manager { if logger == nil { @@ -148,7 +157,7 @@ type Manager struct { targetsMtx sync.Mutex // providers keeps track of SD providers. - providers []*provider + providers []*Provider // The sync channel sends the updates as a map where the key is the job value from the scrape config. syncCh chan map[string][]*targetgroup.Group @@ -163,6 +172,11 @@ type Manager struct { lastProvider uint } +// Providers returns the currently configured SD providers. +func (m *Manager) Providers() []*Provider { + return m.providers +} + // Run starts the background processing. func (m *Manager) Run() error { go m.sender() @@ -194,7 +208,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { wg sync.WaitGroup // keep shows if we keep any providers after reload. keep bool - newProviders []*provider + newProviders []*Provider ) for _, prov := range m.providers { // Cancel obsolete providers. @@ -260,7 +274,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error { // StartCustomProvider is used for sdtool. Only use this if you know what you're doing. func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker Discoverer) { - p := &provider{ + p := &Provider{ name: name, d: worker, subs: map[string]struct{}{ @@ -271,7 +285,7 @@ func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker D m.startProvider(ctx, p) } -func (m *Manager) startProvider(ctx context.Context, p *provider) { +func (m *Manager) startProvider(ctx context.Context, p *Provider) { level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs)) ctx, cancel := context.WithCancel(ctx) updates := make(chan []*targetgroup.Group) @@ -283,7 +297,7 @@ func (m *Manager) startProvider(ctx context.Context, p *provider) { } // cleaner cleans resources associated with provider. -func (m *Manager) cleaner(p *provider) { +func (m *Manager) cleaner(p *Provider) { m.targetsMtx.Lock() p.mu.RLock() for s := range p.subs { @@ -296,7 +310,7 @@ func (m *Manager) cleaner(p *provider) { } } -func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) { +func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targetgroup.Group) { // Ensure targets from this provider are cleaned up. defer m.cleaner(p) for { @@ -422,7 +436,7 @@ func (m *Manager) registerProviders(cfgs Configs, setName string) int { failed++ return } - m.providers = append(m.providers, &provider{ + m.providers = append(m.providers, &Provider{ name: fmt.Sprintf("%s/%d", typ, m.lastProvider), d: d, config: cfg,