diff --git a/discovery/manager.go b/discovery/manager.go index 0a3d107a3..0ce6b9fc5 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -16,7 +16,6 @@ package discovery import ( "context" "fmt" - "sort" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -59,31 +58,26 @@ type poolKey struct { provider string } -// byProvider implements sort.Interface for []poolKey based on the provider field. -// Sorting is needed so that we can have predictable tests. -type byProvider []poolKey - -func (a byProvider) Len() int { return len(a) } -func (a byProvider) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byProvider) Less(i, j int) bool { return a[i].provider < a[j].provider } - // NewManager is the Discovery Manager constructor func NewManager(logger log.Logger) *Manager { return &Manager{ logger: logger, actionCh: make(chan func(context.Context)), syncCh: make(chan map[string][]*targetgroup.Group), - targets: make(map[poolKey][]*targetgroup.Group), + targets: make(map[poolKey]map[string]*targetgroup.Group), discoverCancel: []context.CancelFunc{}, } } -// Manager maintains a set of discovery providers and sends each update to a channel used by other packages. +// Manager maintains a set of discovery providers and sends each update to a map channel. +// Targets are grouped by the target set name. type Manager struct { logger log.Logger actionCh chan func(context.Context) discoverCancel []context.CancelFunc - targets map[poolKey][]*targetgroup.Group + // Some Discoverers(eg. k8s) send only the updates for a given target group + // so we use map[tg.Source]*targetgroup.Group to know which group to update. + targets map[poolKey]map[string]*targetgroup.Group // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. syncCh chan map[string][]*targetgroup.Group } @@ -143,8 +137,8 @@ func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan if !ok { return } - m.addGroup(poolKey, tgs) - m.syncCh <- m.allGroups(poolKey) + m.updateGroup(poolKey, tgs) + m.syncCh <- m.allGroups() } } } @@ -153,16 +147,21 @@ func (m *Manager) cancelDiscoverers() { for _, c := range m.discoverCancel { c() } - m.targets = make(map[poolKey][]*targetgroup.Group) + m.targets = make(map[poolKey]map[string]*targetgroup.Group) m.discoverCancel = nil } -func (m *Manager) addGroup(poolKey poolKey, tg []*targetgroup.Group) { +func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { done := make(chan struct{}) m.actionCh <- func(ctx context.Context) { - if tg != nil { - m.targets[poolKey] = tg + for _, tg := range tgs { + if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. + if _, ok := m.targets[poolKey]; !ok { + m.targets[poolKey] = make(map[string]*targetgroup.Group) + } + m.targets[poolKey][tg.Source] = tg + } } close(done) @@ -170,24 +169,16 @@ func (m *Manager) addGroup(poolKey poolKey, tg []*targetgroup.Group) { <-done } -func (m *Manager) allGroups(pk poolKey) map[string][]*targetgroup.Group { +func (m *Manager) allGroups() map[string][]*targetgroup.Group { tSets := make(chan map[string][]*targetgroup.Group) m.actionCh <- func(ctx context.Context) { - - // Sorting by the poolKey is needed so that we can have predictable tests. - var pKeys []poolKey - for pk := range m.targets { - pKeys = append(pKeys, pk) - } - sort.Sort(byProvider(pKeys)) - tSetsAll := map[string][]*targetgroup.Group{} - for _, pk := range pKeys { - for _, tg := range m.targets[pk] { - if tg.Source != "" { // Don't add empty targets. - tSetsAll[pk.setName] = append(tSetsAll[pk.setName], tg) - } + for pkey, tsets := range m.targets { + for _, tg := range tsets { + // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' + // to signal that it needs to stop all scrape loops for this target set. + tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg) } } tSets <- tSetsAll diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 2e99f097c..f3ecf784e 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "reflect" + "sort" "strconv" "testing" "time" @@ -103,11 +104,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }}, }, @@ -116,11 +117,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { expectedTargets: [][]*targetgroup.Group{ { { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, @@ -133,11 +134,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, @@ -147,7 +148,7 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp2-initial1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, }, @@ -158,24 +159,24 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { expectedTargets: [][]*targetgroup.Group{ { { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, { { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, { - Source: "tp2-initial1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, }, @@ -188,34 +189,52 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, interval: 0, }, { - targetGroups: []targetgroup.Group{}, - interval: 10, + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{}, + }, + }, + interval: 10, }, }, }, expectedTargets: [][]*targetgroup.Group{ { { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, - {}, + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{}, + }, + }, }, }, { @@ -225,11 +244,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, @@ -238,13 +257,17 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, + { + Source: "tp1_group3", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, }, interval: 10, }, @@ -253,23 +276,27 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { expectedTargets: [][]*targetgroup.Group{ { { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, { { - Source: "update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, + { + Source: "tp1_group3", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, }, }, }, @@ -280,11 +307,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, @@ -293,11 +320,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp1-update1", + Source: "tp1_group3", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "tp1-update2", + Source: "tp1_group4", Targets: []model.LabelSet{{"__instance__": "4"}}, }, }, @@ -308,11 +335,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp2-initial1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "5"}}, }, { - Source: "tp2-initial2", + Source: "tp2_group2", Targets: []model.LabelSet{{"__instance__": "6"}}, }, }, @@ -321,11 +348,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp2-update1", + Source: "tp2_group3", Targets: []model.LabelSet{{"__instance__": "7"}}, }, { - Source: "tp2-update2", + Source: "tp2_group4", Targets: []model.LabelSet{{"__instance__": "8"}}, }, }, @@ -336,82 +363,106 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { expectedTargets: [][]*targetgroup.Group{ { { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, { { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, { - Source: "tp2-initial1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "5"}}, }, { - Source: "tp2-initial2", + Source: "tp2_group2", Targets: []model.LabelSet{{"__instance__": "6"}}, }, }, { { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, { - Source: "tp2-update1", + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2_group2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + { + Source: "tp2_group3", Targets: []model.LabelSet{{"__instance__": "7"}}, }, { - Source: "tp2-update2", + Source: "tp2_group4", Targets: []model.LabelSet{{"__instance__": "8"}}, }, }, { { - Source: "tp1-update1", + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + { + Source: "tp1_group3", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "tp1-update2", + Source: "tp1_group4", Targets: []model.LabelSet{{"__instance__": "4"}}, }, { - Source: "tp2-update1", + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2_group2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + { + Source: "tp2_group3", Targets: []model.LabelSet{{"__instance__": "7"}}, }, { - Source: "tp2-update2", + Source: "tp2_group4", Targets: []model.LabelSet{{"__instance__": "8"}}, }, }, }, }, { - title: "One tp initials arrive after other tp updates.", + title: "One TP initials arrive after other TP updates.", updates: map[string][]update{ "tp1": { { targetGroups: []targetgroup.Group{ { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, @@ -420,11 +471,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp1-update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "tp1-update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, }, @@ -435,11 +486,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp2-initial1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "5"}}, }, { - Source: "tp2-initial2", + Source: "tp2_group2", Targets: []model.LabelSet{{"__instance__": "6"}}, }, }, @@ -448,11 +499,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp2-update1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "7"}}, }, { - Source: "tp2-update2", + Source: "tp2_group2", Targets: []model.LabelSet{{"__instance__": "8"}}, }, }, @@ -463,57 +514,57 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { expectedTargets: [][]*targetgroup.Group{ { { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, { { - Source: "tp1-update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "tp1-update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, }, { { - Source: "tp1-update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "tp1-update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, { - Source: "tp2-initial1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "5"}}, }, { - Source: "tp2-initial2", + Source: "tp2_group2", Targets: []model.LabelSet{{"__instance__": "6"}}, }, }, { { - Source: "tp1-update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "tp1-update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, { - Source: "tp2-update1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "7"}}, }, { - Source: "tp2-update2", + Source: "tp2_group2", Targets: []model.LabelSet{{"__instance__": "8"}}, }, }, @@ -521,34 +572,43 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { }, { - title: "Single TP Single provider empty update in between", + title: "Single TP empty update in between", updates: map[string][]update{ "tp1": { { targetGroups: []targetgroup.Group{ { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, interval: 30, }, { - targetGroups: []targetgroup.Group{}, - interval: 10, + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{}, + }, + }, + interval: 10, }, { targetGroups: []targetgroup.Group{ { - Source: "update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, }, @@ -559,22 +619,31 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { expectedTargets: [][]*targetgroup.Group{ { { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, - {}, { { - Source: "update1", + Source: "tp1_group1", + Targets: []model.LabelSet{}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{}, + }, + }, + { + { + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, }, @@ -606,6 +675,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { break Loop case tsetMap := <-discoveryManager.SyncCh(): for _, received := range tsetMap { + // Need to sort by the Groups source as the Discovery manager doesn't guarantee the order. + sort.Sort(byGroupSource(received)) if !reflect.DeepEqual(received, testCase.expectedTargets[x]) { var receivedFormated string for _, receivedTargets := range received { @@ -628,7 +699,7 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { } func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { - verifyPresence := func(tSets map[poolKey][]*targetgroup.Group, poolKey poolKey, label string, present bool) { + verifyPresence := func(tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) { if _, ok := tSets[poolKey]; !ok { t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets) return @@ -729,3 +800,10 @@ func (tp mockdiscoveryProvider) sendUpdates() { tp.up <- tgs } } + +// byGroupSource implements sort.Interface so we can sort by the Source field. +type byGroupSource []*targetgroup.Group + +func (a byGroupSource) Len() int { return len(a) } +func (a byGroupSource) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byGroupSource) Less(i, j int) bool { return a[i].Source < a[j].Source } diff --git a/retrieval/manager.go b/retrieval/manager.go index adf936022..5cf05b79b 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -83,9 +83,11 @@ func (m *ScrapeManager) Stop() { func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { done := make(chan struct{}) m.actionCh <- func() { + c := make(map[string]*config.ScrapeConfig) for _, scfg := range cfg.ScrapeConfigs { - m.scrapeConfigs[scfg.JobName] = scfg + c[scfg.JobName] = scfg } + m.scrapeConfigs = c close(done) } <-done @@ -144,20 +146,15 @@ func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) error { } else { existing.Sync(tgroup) } + } - // Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config. - jobs := make(map[string]struct{}) - - for k := range m.scrapeConfigs { - jobs[k] = struct{}{} - } - - for name, sp := range m.scrapePools { - if _, ok := jobs[name]; !ok { - sp.stop() - delete(m.scrapePools, name) - } + // Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config. + for name, sp := range m.scrapePools { + if _, ok := m.scrapeConfigs[name]; !ok { + sp.stop() + delete(m.scrapePools, name) } } + return nil }