diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 13653a6da..4b7297a2b 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -667,7 +667,7 @@ func TestTargetUpdatesOrder(t *testing.T) { provUpdates := make(chan []*targetgroup.Group) for _, up := range tc.updates { - go newMockDiscoveryProvider(up).Run(ctx, provUpdates) + go newMockDiscoveryProvider(up...).Run(ctx, provUpdates) if len(up) > 0 { totalUpdatesCount = totalUpdatesCount + len(up) } @@ -885,31 +885,6 @@ scrape_configs: } } -func TestCoordinationWithEmptyProvider(t *testing.T) { - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - mgr := NewManager(ctx, nil) - mgr.updatert = 100 * time.Millisecond - go mgr.Run() - - p := emptyProvider{} - mgr.StartCustomProvider(ctx, "empty", p) - - select { - case <-ctx.Done(): - t.Fatal("no update received in the expected timeframe") - case tgs, ok := <-mgr.SyncCh(): - if !ok { - t.Fatal("discovery manager channel is closed") - } - if len(tgs) != 0 { - t.Fatalf("target groups mismatch, got: %#v, expected: {}\n", tgs) - } - } -} - func TestCoordinationWithReceiver(t *testing.T) { updateDelay := 100 * time.Millisecond @@ -920,13 +895,76 @@ func TestCoordinationWithReceiver(t *testing.T) { testCases := []struct { title string - providers map[string][]update + providers map[string]Discoverer expected []expect }{ + { + title: "Receiver should get an empty map even when the provider sends nothing and closes the channel", + providers: map[string]Discoverer{ + "empty": &onceProvider{}, + }, + expected: []expect{ + { + tgs: map[string][]*targetgroup.Group{}, + }, + }, + }, + { + title: "Receiver should get updates even when one provider closes its channel", + providers: map[string]Discoverer{ + "once1": &onceProvider{ + tgs: []*targetgroup.Group{ + { + Source: "tg1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + }, + }, + "mock1": newMockDiscoveryProvider( + update{ + interval: 2 * updateDelay / time.Millisecond, + targetGroups: []targetgroup.Group{ + { + Source: "tg2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + }, + ), + }, + expected: []expect{ + { + tgs: map[string][]*targetgroup.Group{ + "once1": []*targetgroup.Group{ + { + Source: "tg1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + }, + }, + }, + { + tgs: map[string][]*targetgroup.Group{ + "once1": []*targetgroup.Group{ + { + Source: "tg1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + }, + "mock1": []*targetgroup.Group{ + { + Source: "tg2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + }, + }, + }, + }, { title: "Receiver should get updates even when the channel is blocked", - providers: map[string][]update{ - "mock1": []update{ + providers: map[string]Discoverer{ + "mock1": newMockDiscoveryProvider( update{ targetGroups: []targetgroup.Group{ { @@ -944,7 +982,7 @@ func TestCoordinationWithReceiver(t *testing.T) { }, }, }, - }, + ), }, expected: []expect{ { @@ -977,8 +1015,8 @@ func TestCoordinationWithReceiver(t *testing.T) { }, { title: "The receiver gets an update when a target group is gone", - providers: map[string][]update{ - "mock1": []update{ + providers: map[string]Discoverer{ + "mock1": newMockDiscoveryProvider( update{ targetGroups: []targetgroup.Group{ { @@ -996,7 +1034,7 @@ func TestCoordinationWithReceiver(t *testing.T) { }, }, }, - }, + ), }, expected: []expect{ { @@ -1023,8 +1061,8 @@ func TestCoordinationWithReceiver(t *testing.T) { }, { title: "The receiver gets merged updates", - providers: map[string][]update{ - "mock1": []update{ + providers: map[string]Discoverer{ + "mock1": newMockDiscoveryProvider( // This update should never be seen by the receiver because // it is overwritten by the next one. update{ @@ -1043,7 +1081,7 @@ func TestCoordinationWithReceiver(t *testing.T) { }, }, }, - }, + ), }, expected: []expect{ { @@ -1060,8 +1098,8 @@ func TestCoordinationWithReceiver(t *testing.T) { }, { title: "Discovery with multiple providers", - providers: map[string][]update{ - "mock1": []update{ + providers: map[string]Discoverer{ + "mock1": newMockDiscoveryProvider( // This update is available in the first receive. update{ targetGroups: []targetgroup.Group{ @@ -1071,8 +1109,8 @@ func TestCoordinationWithReceiver(t *testing.T) { }, }, }, - }, - "mock2": []update{ + ), + "mock2": newMockDiscoveryProvider( // This update should only arrive after the receiver has read from the channel once. update{ interval: 2 * updateDelay / time.Millisecond, @@ -1082,8 +1120,7 @@ func TestCoordinationWithReceiver(t *testing.T) { Targets: []model.LabelSet{{"__instance__": "2"}}, }, }, - }, - }, + }), }, expected: []expect{ { @@ -1127,8 +1164,7 @@ func TestCoordinationWithReceiver(t *testing.T) { mgr.updatert = updateDelay go mgr.Run() - for name := range tc.providers { - p := newMockDiscoveryProvider(tc.providers[name]) + for name, p := range tc.providers { mgr.StartCustomProvider(ctx, name, p) } @@ -1136,10 +1172,10 @@ func TestCoordinationWithReceiver(t *testing.T) { time.Sleep(expected.delay) select { case <-ctx.Done(): - t.Fatal("no update received in the expected timeframe") + t.Fatalf("step %d: no update received in the expected timeframe", i) case tgs, ok := <-mgr.SyncCh(): if !ok { - t.Fatal("discovery manager channel is closed") + t.Fatalf("step %d: discovery manager channel is closed", i) } if len(tgs) != len(expected.tgs) { t.Fatalf("step %d: target groups mismatch, got: %d, expected: %d\ngot: %#v\nexpected: %#v", @@ -1147,7 +1183,7 @@ func TestCoordinationWithReceiver(t *testing.T) { } for k := range expected.tgs { if _, ok := tgs[k]; !ok { - t.Fatalf("step %d: target group not found: %s", i, k) + t.Fatalf("step %d: target group not found: %s\ngot: %#v", i, k, tgs) } assertEqualGroups(t, tgs[k], expected.tgs[k], func(got, expected string) string { return fmt.Sprintf("step %d: targets mismatch \ngot: %q \nexpected: %q", i, got, expected) @@ -1168,7 +1204,7 @@ type mockdiscoveryProvider struct { updates []update } -func newMockDiscoveryProvider(updates []update) mockdiscoveryProvider { +func newMockDiscoveryProvider(updates ...update) mockdiscoveryProvider { tp := mockdiscoveryProvider{ updates: updates, } @@ -1206,9 +1242,14 @@ func (a byGroupSource) Len() int { return len(a) } func (a byGroupSource) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a byGroupSource) Less(i, j int) bool { return a[i].Source < a[j].Source } -// emptyProvider sends no updates and closes the update channel. -type emptyProvider struct{} +// onceProvider sends updates once (if any) and closes the update channel. +type onceProvider struct { + tgs []*targetgroup.Group +} -func (e emptyProvider) Run(_ context.Context, ch chan<- []*targetgroup.Group) { +func (o onceProvider) Run(_ context.Context, ch chan<- []*targetgroup.Group) { + if len(o.tgs) > 0 { + ch <- o.tgs + } close(ch) }