Merge pull request #3654 from krasi-georgiev/discovery-handle-discoverer-updates

discovery - handle Discoverers that send only target Group updates.
pull/3593/merge
Goutham Veeramachaneni 7 years ago committed by GitHub
commit b20a1b1b1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -16,7 +16,6 @@ package discovery
import (
"context"
"fmt"
"sort"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
@ -59,31 +58,26 @@ type poolKey struct {
provider string
}
// byProvider implements sort.Interface for []poolKey based on the provider field.
// Sorting is needed so that we can have predictable tests.
type byProvider []poolKey
func (a byProvider) Len() int { return len(a) }
func (a byProvider) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byProvider) Less(i, j int) bool { return a[i].provider < a[j].provider }
// NewManager is the Discovery Manager constructor
func NewManager(logger log.Logger) *Manager {
return &Manager{
logger: logger,
actionCh: make(chan func(context.Context)),
syncCh: make(chan map[string][]*targetgroup.Group),
targets: make(map[poolKey][]*targetgroup.Group),
targets: make(map[poolKey]map[string]*targetgroup.Group),
discoverCancel: []context.CancelFunc{},
}
}
// Manager maintains a set of discovery providers and sends each update to a channel used by other packages.
// Manager maintains a set of discovery providers and sends each update to a map channel.
// Targets are grouped by the target set name.
type Manager struct {
logger log.Logger
actionCh chan func(context.Context)
discoverCancel []context.CancelFunc
targets map[poolKey][]*targetgroup.Group
// Some Discoverers(eg. k8s) send only the updates for a given target group
// so we use map[tg.Source]*targetgroup.Group to know which group to update.
targets map[poolKey]map[string]*targetgroup.Group
// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
syncCh chan map[string][]*targetgroup.Group
}
@ -143,8 +137,8 @@ func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan
if !ok {
return
}
m.addGroup(poolKey, tgs)
m.syncCh <- m.allGroups(poolKey)
m.updateGroup(poolKey, tgs)
m.syncCh <- m.allGroups()
}
}
}
@ -153,16 +147,21 @@ func (m *Manager) cancelDiscoverers() {
for _, c := range m.discoverCancel {
c()
}
m.targets = make(map[poolKey][]*targetgroup.Group)
m.targets = make(map[poolKey]map[string]*targetgroup.Group)
m.discoverCancel = nil
}
func (m *Manager) addGroup(poolKey poolKey, tg []*targetgroup.Group) {
func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
done := make(chan struct{})
m.actionCh <- func(ctx context.Context) {
if tg != nil {
m.targets[poolKey] = tg
for _, tg := range tgs {
if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
if _, ok := m.targets[poolKey]; !ok {
m.targets[poolKey] = make(map[string]*targetgroup.Group)
}
m.targets[poolKey][tg.Source] = tg
}
}
close(done)
@ -170,24 +169,16 @@ func (m *Manager) addGroup(poolKey poolKey, tg []*targetgroup.Group) {
<-done
}
func (m *Manager) allGroups(pk poolKey) map[string][]*targetgroup.Group {
func (m *Manager) allGroups() map[string][]*targetgroup.Group {
tSets := make(chan map[string][]*targetgroup.Group)
m.actionCh <- func(ctx context.Context) {
// Sorting by the poolKey is needed so that we can have predictable tests.
var pKeys []poolKey
for pk := range m.targets {
pKeys = append(pKeys, pk)
}
sort.Sort(byProvider(pKeys))
tSetsAll := map[string][]*targetgroup.Group{}
for _, pk := range pKeys {
for _, tg := range m.targets[pk] {
if tg.Source != "" { // Don't add empty targets.
tSetsAll[pk.setName] = append(tSetsAll[pk.setName], tg)
}
for pkey, tsets := range m.targets {
for _, tg := range tsets {
// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
// to signal that it needs to stop all scrape loops for this target set.
tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg)
}
}
tSets <- tSetsAll

@ -17,6 +17,7 @@ import (
"context"
"fmt"
"reflect"
"sort"
"strconv"
"testing"
"time"
@ -103,11 +104,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{
targetGroups: []targetgroup.Group{
{
Source: "initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
}},
},
@ -116,11 +117,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
@ -133,11 +134,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{
targetGroups: []targetgroup.Group{
{
Source: "tp1-initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
@ -147,7 +148,7 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{
targetGroups: []targetgroup.Group{
{
Source: "tp2-initial1",
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
},
@ -158,24 +159,24 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "tp1-initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
}, {
{
Source: "tp1-initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp2-initial1",
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
},
@ -188,34 +189,52 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{
targetGroups: []targetgroup.Group{
{
Source: "initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 0,
},
{
targetGroups: []targetgroup.Group{},
interval: 10,
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{},
},
},
interval: 10,
},
},
},
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{},
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{},
},
},
},
},
{
@ -225,11 +244,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{
targetGroups: []targetgroup.Group{
{
Source: "initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
@ -238,13 +257,17 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{
targetGroups: []targetgroup.Group{
{
Source: "update1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "update2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp1_group3",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
},
interval: 10,
},
@ -253,23 +276,27 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "update1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "update2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp1_group3",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
},
},
},
@ -280,11 +307,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{
targetGroups: []targetgroup.Group{
{
Source: "tp1-initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
@ -293,11 +320,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{
targetGroups: []targetgroup.Group{
{
Source: "tp1-update1",
Source: "tp1_group3",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Source: "tp1_group4",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
@ -308,11 +335,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{
targetGroups: []targetgroup.Group{
{
Source: "tp2-initial1",
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2-initial2",
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
@ -321,11 +348,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{
targetGroups: []targetgroup.Group{
{
Source: "tp2-update1",
Source: "tp2_group3",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Source: "tp2_group4",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
@ -336,82 +363,106 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "tp1-initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "tp1-initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp2-initial1",
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2-initial2",
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
{
{
Source: "tp1-initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp2-update1",
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
{
Source: "tp2_group3",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Source: "tp2_group4",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
{
{
Source: "tp1-update1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp1_group3",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Source: "tp1_group4",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp2-update1",
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
{
Source: "tp2_group3",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Source: "tp2_group4",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
},
},
{
title: "One tp initials arrive after other tp updates.",
title: "One TP initials arrive after other TP updates.",
updates: map[string][]update{
"tp1": {
{
targetGroups: []targetgroup.Group{
{
Source: "tp1-initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
@ -420,11 +471,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{
targetGroups: []targetgroup.Group{
{
Source: "tp1-update1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
@ -435,11 +486,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{
targetGroups: []targetgroup.Group{
{
Source: "tp2-initial1",
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2-initial2",
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
@ -448,11 +499,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
{
targetGroups: []targetgroup.Group{
{
Source: "tp2-update1",
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
@ -463,57 +514,57 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "tp1-initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "tp1-update1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
{
{
Source: "tp1-update1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp2-initial1",
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2-initial2",
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
{
{
Source: "tp1-update1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp2-update1",
Source: "tp2_group1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Source: "tp2_group2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
@ -521,34 +572,43 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
},
{
title: "Single TP Single provider empty update in between",
title: "Single TP empty update in between",
updates: map[string][]update{
"tp1": {
{
targetGroups: []targetgroup.Group{
{
Source: "initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 30,
},
{
targetGroups: []targetgroup.Group{},
interval: 10,
targetGroups: []targetgroup.Group{
{
Source: "tp1_group1",
Targets: []model.LabelSet{},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{},
},
},
interval: 10,
},
{
targetGroups: []targetgroup.Group{
{
Source: "update1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "update2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
@ -559,22 +619,31 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
expectedTargets: [][]*targetgroup.Group{
{
{
Source: "initial1",
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{},
{
{
Source: "update1",
Source: "tp1_group1",
Targets: []model.LabelSet{},
},
{
Source: "tp1_group2",
Targets: []model.LabelSet{},
},
},
{
{
Source: "tp1_group1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "update2",
Source: "tp1_group2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
@ -606,6 +675,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
break Loop
case tsetMap := <-discoveryManager.SyncCh():
for _, received := range tsetMap {
// Need to sort by the Groups source as the Discovery manager doesn't guarantee the order.
sort.Sort(byGroupSource(received))
if !reflect.DeepEqual(received, testCase.expectedTargets[x]) {
var receivedFormated string
for _, receivedTargets := range received {
@ -628,7 +699,7 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
}
func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) {
verifyPresence := func(tSets map[poolKey][]*targetgroup.Group, poolKey poolKey, label string, present bool) {
verifyPresence := func(tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) {
if _, ok := tSets[poolKey]; !ok {
t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets)
return
@ -729,3 +800,10 @@ func (tp mockdiscoveryProvider) sendUpdates() {
tp.up <- tgs
}
}
// byGroupSource implements sort.Interface so we can sort by the Source field.
type byGroupSource []*targetgroup.Group
func (a byGroupSource) Len() int { return len(a) }
func (a byGroupSource) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byGroupSource) Less(i, j int) bool { return a[i].Source < a[j].Source }

@ -83,9 +83,11 @@ func (m *ScrapeManager) Stop() {
func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error {
done := make(chan struct{})
m.actionCh <- func() {
c := make(map[string]*config.ScrapeConfig)
for _, scfg := range cfg.ScrapeConfigs {
m.scrapeConfigs[scfg.JobName] = scfg
c[scfg.JobName] = scfg
}
m.scrapeConfigs = c
close(done)
}
<-done
@ -144,20 +146,15 @@ func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) error {
} else {
existing.Sync(tgroup)
}
}
// Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config.
jobs := make(map[string]struct{})
for k := range m.scrapeConfigs {
jobs[k] = struct{}{}
}
for name, sp := range m.scrapePools {
if _, ok := jobs[name]; !ok {
sp.stop()
delete(m.scrapePools, name)
}
// Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config.
for name, sp := range m.scrapePools {
if _, ok := m.scrapeConfigs[name]; !ok {
sp.stop()
delete(m.scrapePools, name)
}
}
return nil
}

Loading…
Cancel
Save