Simplify SD update throttling (#4523)

* simplfied SD updates throtling

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>

* add default to catch cases when we don't have new updates.

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
pull/4544/head
Krasi Georgiev 6 years ago committed by Tobias Schmidt
parent a188693015
commit 53691ae261

@ -82,10 +82,6 @@ type Manager struct {
targets map[poolKey]map[string]*targetgroup.Group targets map[poolKey]map[string]*targetgroup.Group
// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
syncCh chan map[string][]*targetgroup.Group syncCh chan map[string][]*targetgroup.Group
// True if updates were received in the last 5 seconds.
recentlyUpdated bool
// Protects recentlyUpdated.
recentlyUpdatedMtx sync.Mutex
} }
// Run starts the background processing // Run starts the background processing
@ -132,43 +128,47 @@ func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Dis
go worker.Run(ctx, updates) go worker.Run(ctx, updates)
go m.runProvider(ctx, poolKey, updates) go m.runProvider(ctx, poolKey, updates)
go m.runUpdater(ctx)
} }
func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*targetgroup.Group) { func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*targetgroup.Group) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
updateReceived := make(chan struct{}, 1)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case tgs, ok := <-updates: case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel // Handle the case that a target provider(E.g. StaticProvider) exits and
// before the context is done. // closes the channel before the context is done.
// This will prevent sending the updates to the receiver so we send them before exiting.
if !ok { if !ok {
select {
case m.syncCh <- m.allGroups():
default:
level.Debug(m.logger).Log("msg", "discovery receiver's channel was full")
}
return return
} }
m.updateGroup(poolKey, tgs) m.updateGroup(poolKey, tgs)
m.recentlyUpdatedMtx.Lock()
m.recentlyUpdated = true
m.recentlyUpdatedMtx.Unlock()
}
}
}
func (m *Manager) runUpdater(ctx context.Context) { // Signal that there was an update.
ticker := time.NewTicker(5 * time.Second) select {
defer ticker.Stop() case updateReceived <- struct{}{}:
default:
for { }
select { case <-ticker.C: // Some discoverers send updates too often so we send these to the receiver once every 5 seconds.
case <-ctx.Done(): select {
return case <-updateReceived: // Send only when there is a new update.
case <-ticker.C: select {
m.recentlyUpdatedMtx.Lock() case m.syncCh <- m.allGroups():
if m.recentlyUpdated { default:
m.syncCh <- m.allGroups() level.Debug(m.logger).Log("msg", "discovery receiver's channel was full")
m.recentlyUpdated = false }
default:
} }
m.recentlyUpdatedMtx.Unlock()
} }
} }
} }

Loading…
Cancel
Save