From 7e28397a2c8a24da347efd79195b8ed3f08beaf3 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 4 Jan 2018 13:14:22 +0000 Subject: [PATCH 01/13] discovery - handle Discoverers that send only target Group updates rather than all Targets on every update. Signed-off-by: Krasi Georgiev --- discovery/manager.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index 0a3d107a3..d3cab05c4 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -73,7 +73,7 @@ func NewManager(logger log.Logger) *Manager { logger: logger, actionCh: make(chan func(context.Context)), syncCh: make(chan map[string][]*targetgroup.Group), - targets: make(map[poolKey][]*targetgroup.Group), + targets: make(map[poolKey]map[string]*targetgroup.Group), discoverCancel: []context.CancelFunc{}, } } @@ -83,7 +83,8 @@ type Manager struct { logger log.Logger actionCh chan func(context.Context) discoverCancel []context.CancelFunc - targets map[poolKey][]*targetgroup.Group + // We use map[string]*targetgroup.Group to handle Discoverers that send only updates instead of all targets on every update. + targets map[poolKey]map[string]*targetgroup.Group // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. syncCh chan map[string][]*targetgroup.Group } @@ -143,8 +144,8 @@ func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan if !ok { return } - m.addGroup(poolKey, tgs) - m.syncCh <- m.allGroups(poolKey) + m.updateGroup(poolKey, tgs) + m.syncCh <- m.allGroups() } } } @@ -153,16 +154,21 @@ func (m *Manager) cancelDiscoverers() { for _, c := range m.discoverCancel { c() } - m.targets = make(map[poolKey][]*targetgroup.Group) + m.targets = make(map[poolKey]map[string]*targetgroup.Group) m.discoverCancel = nil } -func (m *Manager) addGroup(poolKey poolKey, tg []*targetgroup.Group) { +func (m *Manager) updateGroup(poolKey poolKey, tg []*targetgroup.Group) { done := make(chan struct{}) m.actionCh <- func(ctx context.Context) { if tg != nil { - m.targets[poolKey] = tg + for _, t := range tg { + if _, ok := m.targets[poolKey]; !ok { + m.targets[poolKey] = make(map[string]*targetgroup.Group) + } + m.targets[poolKey][t.Source] = t + } } close(done) @@ -170,7 +176,7 @@ func (m *Manager) addGroup(poolKey poolKey, tg []*targetgroup.Group) { <-done } -func (m *Manager) allGroups(pk poolKey) map[string][]*targetgroup.Group { +func (m *Manager) allGroups() map[string][]*targetgroup.Group { tSets := make(chan map[string][]*targetgroup.Group) m.actionCh <- func(ctx context.Context) { @@ -185,7 +191,9 @@ func (m *Manager) allGroups(pk poolKey) map[string][]*targetgroup.Group { tSetsAll := map[string][]*targetgroup.Group{} for _, pk := range pKeys { for _, tg := range m.targets[pk] { - if tg.Source != "" { // Don't add empty targets. + // Don't add empty targets. + // Some Discoverers(eg. k8s) send only the updates so removed targets will be updated with an empty Source value. + if tg.Source != "" { tSetsAll[pk.setName] = append(tSetsAll[pk.setName], tg) } } From 638818a974d528e7554600515570a235c5657e36 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 4 Jan 2018 13:57:34 +0000 Subject: [PATCH 02/13] some Discoverers send nil targetgroup so need to check for it when updating a group --- discovery/manager.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index d3cab05c4..48e2ba3d0 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -158,16 +158,18 @@ func (m *Manager) cancelDiscoverers() { m.discoverCancel = nil } -func (m *Manager) updateGroup(poolKey poolKey, tg []*targetgroup.Group) { +func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { done := make(chan struct{}) m.actionCh <- func(ctx context.Context) { - if tg != nil { - for _, t := range tg { - if _, ok := m.targets[poolKey]; !ok { - m.targets[poolKey] = make(map[string]*targetgroup.Group) + if tgs != nil { + for _, tg := range tgs { + if tg != nil { // Some Discoverers send nil targetgroup so need to check for it to avoid panics. + if _, ok := m.targets[poolKey]; !ok { + m.targets[poolKey] = make(map[string]*targetgroup.Group) + } + m.targets[poolKey][tg.Source] = tg } - m.targets[poolKey][t.Source] = t } } close(done) @@ -191,11 +193,7 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group { tSetsAll := map[string][]*targetgroup.Group{} for _, pk := range pKeys { for _, tg := range m.targets[pk] { - // Don't add empty targets. - // Some Discoverers(eg. k8s) send only the updates so removed targets will be updated with an empty Source value. - if tg.Source != "" { - tSetsAll[pk.setName] = append(tSetsAll[pk.setName], tg) - } + tSetsAll[pk.setName] = append(tSetsAll[pk.setName], tg) } } tSets <- tSetsAll From 135ea0f793fa2a349d0737530a96127c86b3e701 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 4 Jan 2018 21:41:54 +0000 Subject: [PATCH 03/13] discovery manager - doesn't need sorting of the target groups so move it in the discovery manager tests as we only need it there. discovery manager - refactor the discovery tests. Signed-off-by: Krasi Georgiev --- discovery/manager.go | 23 +--- discovery/manager_test.go | 244 +++++++++++++++++++++++++------------- 2 files changed, 164 insertions(+), 103 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index 48e2ba3d0..1bc952fee 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -16,7 +16,6 @@ package discovery import ( "context" "fmt" - "sort" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -59,14 +58,6 @@ type poolKey struct { provider string } -// byProvider implements sort.Interface for []poolKey based on the provider field. -// Sorting is needed so that we can have predictable tests. -type byProvider []poolKey - -func (a byProvider) Len() int { return len(a) } -func (a byProvider) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byProvider) Less(i, j int) bool { return a[i].provider < a[j].provider } - // NewManager is the Discovery Manager constructor func NewManager(logger log.Logger) *Manager { return &Manager{ @@ -182,18 +173,10 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group { tSets := make(chan map[string][]*targetgroup.Group) m.actionCh <- func(ctx context.Context) { - - // Sorting by the poolKey is needed so that we can have predictable tests. - var pKeys []poolKey - for pk := range m.targets { - pKeys = append(pKeys, pk) - } - sort.Sort(byProvider(pKeys)) - tSetsAll := map[string][]*targetgroup.Group{} - for _, pk := range pKeys { - for _, tg := range m.targets[pk] { - tSetsAll[pk.setName] = append(tSetsAll[pk.setName], tg) + for pkey, tsets := range m.targets { + for _, tg := range tsets { + tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg) } } tSets <- tSetsAll diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 2e99f097c..f3ecf784e 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "reflect" + "sort" "strconv" "testing" "time" @@ -103,11 +104,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }}, }, @@ -116,11 +117,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { expectedTargets: [][]*targetgroup.Group{ { { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, @@ -133,11 +134,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, @@ -147,7 +148,7 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp2-initial1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, }, @@ -158,24 +159,24 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { expectedTargets: [][]*targetgroup.Group{ { { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, { { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, { - Source: "tp2-initial1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, }, @@ -188,34 +189,52 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, interval: 0, }, { - targetGroups: []targetgroup.Group{}, - interval: 10, + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{}, + }, + }, + interval: 10, }, }, }, expectedTargets: [][]*targetgroup.Group{ { { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, - {}, + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{}, + }, + }, }, }, { @@ -225,11 +244,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, @@ -238,13 +257,17 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, + { + Source: "tp1_group3", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, }, interval: 10, }, @@ -253,23 +276,27 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { expectedTargets: [][]*targetgroup.Group{ { { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, { { - Source: "update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, + { + Source: "tp1_group3", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, }, }, }, @@ -280,11 +307,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, @@ -293,11 +320,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp1-update1", + Source: "tp1_group3", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "tp1-update2", + Source: "tp1_group4", Targets: []model.LabelSet{{"__instance__": "4"}}, }, }, @@ -308,11 +335,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp2-initial1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "5"}}, }, { - Source: "tp2-initial2", + Source: "tp2_group2", Targets: []model.LabelSet{{"__instance__": "6"}}, }, }, @@ -321,11 +348,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp2-update1", + Source: "tp2_group3", Targets: []model.LabelSet{{"__instance__": "7"}}, }, { - Source: "tp2-update2", + Source: "tp2_group4", Targets: []model.LabelSet{{"__instance__": "8"}}, }, }, @@ -336,82 +363,106 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { expectedTargets: [][]*targetgroup.Group{ { { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, { { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, { - Source: "tp2-initial1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "5"}}, }, { - Source: "tp2-initial2", + Source: "tp2_group2", Targets: []model.LabelSet{{"__instance__": "6"}}, }, }, { { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, { - Source: "tp2-update1", + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2_group2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + { + Source: "tp2_group3", Targets: []model.LabelSet{{"__instance__": "7"}}, }, { - Source: "tp2-update2", + Source: "tp2_group4", Targets: []model.LabelSet{{"__instance__": "8"}}, }, }, { { - Source: "tp1-update1", + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + { + Source: "tp1_group3", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "tp1-update2", + Source: "tp1_group4", Targets: []model.LabelSet{{"__instance__": "4"}}, }, { - Source: "tp2-update1", + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2_group2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + { + Source: "tp2_group3", Targets: []model.LabelSet{{"__instance__": "7"}}, }, { - Source: "tp2-update2", + Source: "tp2_group4", Targets: []model.LabelSet{{"__instance__": "8"}}, }, }, }, }, { - title: "One tp initials arrive after other tp updates.", + title: "One TP initials arrive after other TP updates.", updates: map[string][]update{ "tp1": { { targetGroups: []targetgroup.Group{ { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, @@ -420,11 +471,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp1-update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "tp1-update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, }, @@ -435,11 +486,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp2-initial1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "5"}}, }, { - Source: "tp2-initial2", + Source: "tp2_group2", Targets: []model.LabelSet{{"__instance__": "6"}}, }, }, @@ -448,11 +499,11 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { { targetGroups: []targetgroup.Group{ { - Source: "tp2-update1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "7"}}, }, { - Source: "tp2-update2", + Source: "tp2_group2", Targets: []model.LabelSet{{"__instance__": "8"}}, }, }, @@ -463,57 +514,57 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { expectedTargets: [][]*targetgroup.Group{ { { - Source: "tp1-initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "tp1-initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, { { - Source: "tp1-update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "tp1-update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, }, { { - Source: "tp1-update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "tp1-update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, { - Source: "tp2-initial1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "5"}}, }, { - Source: "tp2-initial2", + Source: "tp2_group2", Targets: []model.LabelSet{{"__instance__": "6"}}, }, }, { { - Source: "tp1-update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "tp1-update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, { - Source: "tp2-update1", + Source: "tp2_group1", Targets: []model.LabelSet{{"__instance__": "7"}}, }, { - Source: "tp2-update2", + Source: "tp2_group2", Targets: []model.LabelSet{{"__instance__": "8"}}, }, }, @@ -521,34 +572,43 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { }, { - title: "Single TP Single provider empty update in between", + title: "Single TP empty update in between", updates: map[string][]update{ "tp1": { { targetGroups: []targetgroup.Group{ { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, interval: 30, }, { - targetGroups: []targetgroup.Group{}, - interval: 10, + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{}, + }, + }, + interval: 10, }, { targetGroups: []targetgroup.Group{ { - Source: "update1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, }, @@ -559,22 +619,31 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { expectedTargets: [][]*targetgroup.Group{ { { - Source: "initial1", + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "1"}}, }, { - Source: "initial2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, - {}, { { - Source: "update1", + Source: "tp1_group1", + Targets: []model.LabelSet{}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{}, + }, + }, + { + { + Source: "tp1_group1", Targets: []model.LabelSet{{"__instance__": "3"}}, }, { - Source: "update2", + Source: "tp1_group2", Targets: []model.LabelSet{{"__instance__": "4"}}, }, }, @@ -606,6 +675,8 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { break Loop case tsetMap := <-discoveryManager.SyncCh(): for _, received := range tsetMap { + // Need to sort by the Groups source as the Discovery manager doesn't guarantee the order. + sort.Sort(byGroupSource(received)) if !reflect.DeepEqual(received, testCase.expectedTargets[x]) { var receivedFormated string for _, receivedTargets := range received { @@ -628,7 +699,7 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { } func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { - verifyPresence := func(tSets map[poolKey][]*targetgroup.Group, poolKey poolKey, label string, present bool) { + verifyPresence := func(tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) { if _, ok := tSets[poolKey]; !ok { t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets) return @@ -729,3 +800,10 @@ func (tp mockdiscoveryProvider) sendUpdates() { tp.up <- tgs } } + +// byGroupSource implements sort.Interface so we can sort by the Source field. +type byGroupSource []*targetgroup.Group + +func (a byGroupSource) Len() int { return len(a) } +func (a byGroupSource) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byGroupSource) Less(i, j int) bool { return a[i].Source < a[j].Source } From 77bf6bece0366ddcadf082c5e33b240875cbb3ef Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 4 Jan 2018 21:57:28 +0000 Subject: [PATCH 04/13] discovery-manager comment update --- discovery/manager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/discovery/manager.go b/discovery/manager.go index 1bc952fee..a8d556a75 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -74,7 +74,8 @@ type Manager struct { logger log.Logger actionCh chan func(context.Context) discoverCancel []context.CancelFunc - // We use map[string]*targetgroup.Group to handle Discoverers that send only updates instead of all targets on every update. + // Some Discoverers(eg. k8s) send only the updates for a given target group + // so we use map[tg.Source]*targetgroup.Group to know which group to update. targets map[poolKey]map[string]*targetgroup.Group // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. syncCh chan map[string][]*targetgroup.Group From 546c29af5b896f37cc91e8f67a4dc14b1ed611fe Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 5 Jan 2018 10:56:56 +0000 Subject: [PATCH 05/13] return early for nil target groups --- discovery/manager.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index a8d556a75..8625614d4 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -154,14 +154,17 @@ func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { done := make(chan struct{}) m.actionCh <- func(ctx context.Context) { - if tgs != nil { - for _, tg := range tgs { - if tg != nil { // Some Discoverers send nil targetgroup so need to check for it to avoid panics. - if _, ok := m.targets[poolKey]; !ok { - m.targets[poolKey] = make(map[string]*targetgroup.Group) - } - m.targets[poolKey][tg.Source] = tg + if tgs == nil { + close(done) + return + } + + for _, tg := range tgs { + if tg != nil { // Some Discoverers send nil targetgroup so need to check for it to avoid panics. + if _, ok := m.targets[poolKey]; !ok { + m.targets[poolKey] = make(map[string]*targetgroup.Group) } + m.targets[poolKey][tg.Source] = tg } } close(done) From abfd9f19209975d20d162ed509111c47589a7b3f Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 12 Jan 2018 12:19:52 +0000 Subject: [PATCH 06/13] nits --- discovery/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/discovery/manager.go b/discovery/manager.go index 8625614d4..8110963dd 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -75,7 +75,7 @@ type Manager struct { actionCh chan func(context.Context) discoverCancel []context.CancelFunc // Some Discoverers(eg. k8s) send only the updates for a given target group - // so we use map[tg.Source]*targetgroup.Group to know which group to update. + // so we use map[tg.Source]*targetgroup.Group to know which group to update. targets map[poolKey]map[string]*targetgroup.Group // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. syncCh chan map[string][]*targetgroup.Group From cabce21b705669cacd356c42ad90465a0d39ff6e Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 12 Jan 2018 13:10:59 +0000 Subject: [PATCH 07/13] delete empty targets sets to avoid memory leaks --- discovery/manager.go | 10 +++++++- discovery/manager_test.go | 51 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/discovery/manager.go b/discovery/manager.go index 8110963dd..4e583dcde 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -179,8 +179,16 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group { m.actionCh <- func(ctx context.Context) { tSetsAll := map[string][]*targetgroup.Group{} for pkey, tsets := range m.targets { + del := true for _, tg := range tsets { - tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg) + if len(tg.Targets) != 0 { + tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg) + del = false + } + } + // Delete the empty map for this target set to avoid memory leaks. + if del { + delete(m.targets, pkey) } } tSets <- tSetsAll diff --git a/discovery/manager_test.go b/discovery/manager_test.go index f3ecf784e..b7ea2457f 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -649,6 +649,57 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { }, }, }, + { + title: "Single TP update with an empty group to check for memory leaks", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 30, + }, + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1_group2", + Targets: nil, + }, + }, + interval: 10, + }, + }, + }, + expectedTargets: [][]*targetgroup.Group{ + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + }, + }, + }, } for testIndex, testCase := range testCases { From 78ba5e62a611ca50427352dc6362ffaedd6324eb Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 12 Jan 2018 13:58:23 +0000 Subject: [PATCH 08/13] few mote usefull comments --- discovery/manager.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/discovery/manager.go b/discovery/manager.go index 4e583dcde..5bf8b26fa 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -70,6 +70,8 @@ func NewManager(logger log.Logger) *Manager { } // Manager maintains a set of discovery providers and sends each update to a channel used by other packages. +// Targets are grouped by target set names. +// When a given target set doesn't include any targets the manager doesn't send any updates for this target set. type Manager struct { logger log.Logger actionCh chan func(context.Context) @@ -160,7 +162,7 @@ func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { } for _, tg := range tgs { - if tg != nil { // Some Discoverers send nil targetgroup so need to check for it to avoid panics. + if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. if _, ok := m.targets[poolKey]; !ok { m.targets[poolKey] = make(map[string]*targetgroup.Group) } @@ -181,6 +183,8 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group { for pkey, tsets := range m.targets { del := true for _, tg := range tsets { + // Don't add a target set if the target group has no targets. + // This happens when a Discoverer sends an empty targets array for a group to indicate that these targets have been removed. if len(tg.Targets) != 0 { tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg) del = false From febebcd49ac6e5ba75007ec24a024dcad67f80c3 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 12 Jan 2018 21:45:47 +0000 Subject: [PATCH 09/13] more comments for the future ME, and reverted the Discovery manager execution changes as these were correct in the first place --- discovery/manager.go | 17 +++---------- discovery/manager_test.go | 51 --------------------------------------- 2 files changed, 4 insertions(+), 64 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index 5bf8b26fa..427a35251 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -70,8 +70,7 @@ func NewManager(logger log.Logger) *Manager { } // Manager maintains a set of discovery providers and sends each update to a channel used by other packages. -// Targets are grouped by target set names. -// When a given target set doesn't include any targets the manager doesn't send any updates for this target set. +// Targets sent to the channel are grouped by the target set name. type Manager struct { logger log.Logger actionCh chan func(context.Context) @@ -181,18 +180,10 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group { m.actionCh <- func(ctx context.Context) { tSetsAll := map[string][]*targetgroup.Group{} for pkey, tsets := range m.targets { - del := true for _, tg := range tsets { - // Don't add a target set if the target group has no targets. - // This happens when a Discoverer sends an empty targets array for a group to indicate that these targets have been removed. - if len(tg.Targets) != 0 { - tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg) - del = false - } - } - // Delete the empty map for this target set to avoid memory leaks. - if del { - delete(m.targets, pkey) + // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' + // to singal that is needs to stop all scrape loops for this target set. + tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg) } } tSets <- tSetsAll diff --git a/discovery/manager_test.go b/discovery/manager_test.go index b7ea2457f..f3ecf784e 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -649,57 +649,6 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) { }, }, }, - { - title: "Single TP update with an empty group to check for memory leaks", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - interval: 30, - }, - { - targetGroups: []targetgroup.Group{ - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - { - Source: "tp1_group2", - Targets: nil, - }, - }, - interval: 10, - }, - }, - }, - expectedTargets: [][]*targetgroup.Group{ - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "1"}}, - }, - { - Source: "tp1_group2", - Targets: []model.LabelSet{{"__instance__": "2"}}, - }, - }, - { - { - Source: "tp1_group1", - Targets: []model.LabelSet{{"__instance__": "3"}}, - }, - }, - }, - }, } for testIndex, testCase := range testCases { From a981b519002bb364f7d736f9788867118cf6ca10 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sun, 14 Jan 2018 19:41:53 +0000 Subject: [PATCH 10/13] The config map was never reset on applying a new config --- retrieval/manager.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/retrieval/manager.go b/retrieval/manager.go index adf936022..c27bd9507 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -83,9 +83,11 @@ func (m *ScrapeManager) Stop() { func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { done := make(chan struct{}) m.actionCh <- func() { + c := make(map[string]*config.ScrapeConfig) for _, scfg := range cfg.ScrapeConfigs { - m.scrapeConfigs[scfg.JobName] = scfg + c[scfg.JobName] = scfg } + m.scrapeConfigs = c close(done) } <-done From a535c8d1b4089a58b5513f3aa50afd17be37833c Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sun, 14 Jan 2018 19:42:31 +0000 Subject: [PATCH 11/13] simplify the pool cleanup --- retrieval/manager.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/retrieval/manager.go b/retrieval/manager.go index c27bd9507..5cf05b79b 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -146,20 +146,15 @@ func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) error { } else { existing.Sync(tgroup) } + } - // Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config. - jobs := make(map[string]struct{}) - - for k := range m.scrapeConfigs { - jobs[k] = struct{}{} - } - - for name, sp := range m.scrapePools { - if _, ok := jobs[name]; !ok { - sp.stop() - delete(m.scrapePools, name) - } + // Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config. + for name, sp := range m.scrapePools { + if _, ok := m.scrapeConfigs[name]; !ok { + sp.stop() + delete(m.scrapePools, name) } } + return nil } From 38938ba493e81edda7454fce4e5ca4dc711febf6 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 15 Jan 2018 11:42:09 +0000 Subject: [PATCH 12/13] comment nits --- discovery/manager.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index 427a35251..bed5c50e0 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -69,8 +69,8 @@ func NewManager(logger log.Logger) *Manager { } } -// Manager maintains a set of discovery providers and sends each update to a channel used by other packages. -// Targets sent to the channel are grouped by the target set name. +// Manager maintains a set of discovery providers and sends each update to a map channel. +// Targets are grouped by the target set name. type Manager struct { logger log.Logger actionCh chan func(context.Context) @@ -182,7 +182,7 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group { for pkey, tsets := range m.targets { for _, tg := range tsets { // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' - // to singal that is needs to stop all scrape loops for this target set. + // to signal that it needs to stop all scrape loops for this target set. tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg) } } From 790cf30fcb64a9ac914f4861a7bbc22f0174a399 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 15 Jan 2018 11:52:20 +0000 Subject: [PATCH 13/13] remove uneeded check --- discovery/manager.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index bed5c50e0..0ce6b9fc5 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -155,11 +155,6 @@ func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { done := make(chan struct{}) m.actionCh <- func(ctx context.Context) { - if tgs == nil { - close(done) - return - } - for _, tg := range tgs { if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. if _, ok := m.targets[poolKey]; !ok {