diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index 427a7990b..02fc2f981 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -18,11 +18,137 @@ import ( "sync" "sync/atomic" "testing" + "time" "github.com/prometheus/prometheus/config" yaml "gopkg.in/yaml.v2" ) +func TestSingleTargetSetWithSingleProviderOnlySendsNewTargetGroups(t *testing.T) { + + testCases := [][]update{ + []update{}, // No updates. + + []update{{[]string{}, 0}}, // Empty initials. + + []update{{[]string{}, 6000}}, // Empty initials with a delay. + + []update{{[]string{"initial1", "initial2"}, 0}}, // Initials only. + + []update{{[]string{"initial1", "initial2"}, 6000}}, // Initials only but after a delay. + + []update{ // Initials and new groups. + {[]string{"initial1", "initial2"}, 0}, + {[]string{"update1", "update2"}, 0}, + }, + + []update{ // Initials and new groups after a delay. + {[]string{"initial1", "initial2"}, 6000}, + {[]string{"update1", "update2"}, 500}, + }, + + []update{ + {[]string{"initial1", "initial2"}, 100}, + {[]string{"update1", "update2", "update3", "update4", "update5", "update6", "update7", "update8", "update9", "update10", "update11"}, 100}, + }, + + []update{ + {[]string{"initial1"}, 10}, + {[]string{"update1"}, 45}, + {[]string{"update2", "update3", "update4"}, 0}, + {[]string{"update5"}, 10}, + {[]string{"update6", "update7", "update8", "update9"}, 70}, + }, + + []update{ + {[]string{"initial1", "initial2"}, 5}, + {[]string{}, 100}, + {[]string{"update1", "update2"}, 100}, + {[]string{"update3", "update4", "update5"}, 70}, + }, + } + + for i, updates := range testCases { + + expectedGroups := make(map[string]struct{}) + for _, update := range updates { + for _, target := range update.targets { + expectedGroups[target] = struct{}{} + } + } + + var wg sync.WaitGroup + wg.Add(1) + + isFirstSyncCall := true + var initialGroups []*config.TargetGroup + var syncedGroups []*config.TargetGroup + + targetSet := NewTargetSet(&mockSyncer{ + sync: func(tgs []*config.TargetGroup) { + syncedGroups = tgs + + if isFirstSyncCall { + isFirstSyncCall = false + initialGroups = tgs + } + + if len(tgs) == len(expectedGroups) { + // All the groups are sent, we can start asserting. + wg.Done() + } + }, + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tp := newMockTargetProvider(updates) + targetProviders := map[string]TargetProvider{} + targetProviders["testProvider"] = tp + + go targetSet.Run(ctx) + targetSet.UpdateProviders(targetProviders) + + finalize := make(chan struct{}) + go func() { + defer close(finalize) + wg.Wait() + }() + + select { + case <-time.After(20000 * time.Millisecond): + t.Errorf("In test case %v: Test timed out after 20000 millisecond. All targets should be sent within the timeout", i) + + case <-finalize: + + if *tp.callCount != 1 { + t.Errorf("In test case %v: TargetProvider Run should be called once only, was called %v times", i, *tp.callCount) + } + + if len(updates) > 0 && updates[0].interval > 5000 { + // If the initial set of targets never arrive or arrive after 5 seconds. + // The first sync call should receive empty set of targets. + if len(initialGroups) != 0 { + t.Errorf("In test case %v: Expecting 0 initial target groups, received %v", i, len(initialGroups)) + } + } + + if len(syncedGroups) != len(expectedGroups) { + t.Errorf("In test case %v: Expecting %v target groups in total, received %v", i, len(expectedGroups), len(syncedGroups)) + } + + for _, tg := range syncedGroups { + if _, ok := expectedGroups[tg.Source]; ok == false { + t.Errorf("In test case %v: '%s' does not exist in expected target groups: %s", i, tg.Source, expectedGroups) + } else { + delete(expectedGroups, tg.Source) // Remove used targets from the map. + } + } + } + } +} + func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { verifyPresence := func(tgroups map[string]*config.TargetGroup, name string, present bool) { @@ -76,25 +202,6 @@ static_configs: verifyPresence(ts.tgroups, "static/0/1", false) } -type mockSyncer struct { - sync func(tgs []*config.TargetGroup) -} - -func (s *mockSyncer) Sync(tgs []*config.TargetGroup) { - if s.sync != nil { - s.sync(tgs) - } -} - -type mockTargetProvider struct { - callCount *uint32 -} - -func (tp mockTargetProvider) Run(ctx context.Context, up chan<- []*config.TargetGroup) { - atomic.AddUint32(tp.callCount, 1) - up <- []*config.TargetGroup{{Source: "dummySource"}} -} - func TestTargetSetRunsSameTargetProviderMultipleTimes(t *testing.T) { var wg sync.WaitGroup @@ -112,10 +219,7 @@ func TestTargetSetRunsSameTargetProviderMultipleTimes(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tp := mockTargetProvider{} - var callCount uint32 - tp.callCount = &callCount - + tp := newMockTargetProvider([]update{{[]string{"initial1", "initial2"}, 10}}) targetProviders := map[string]TargetProvider{} targetProviders["testProvider"] = tp @@ -126,7 +230,59 @@ func TestTargetSetRunsSameTargetProviderMultipleTimes(t *testing.T) { ts2.UpdateProviders(targetProviders) wg.Wait() - if callCount != 2 { - t.Errorf("Was expecting 2 calls received %v", callCount) + if *tp.callCount != 2 { + t.Errorf("Was expecting 2 calls received %v", tp.callCount) + } +} + +type mockSyncer struct { + sync func(tgs []*config.TargetGroup) +} + +func (s *mockSyncer) Sync(tgs []*config.TargetGroup) { + if s.sync != nil { + s.sync(tgs) + } +} + +type mockTargetProvider struct { + callCount *uint32 + updates []update + up chan<- []*config.TargetGroup +} + +type update struct { + targets []string + interval time.Duration +} + +func newMockTargetProvider(updates []update) mockTargetProvider { + var callCount uint32 + + tp := mockTargetProvider{ + callCount: &callCount, + updates: updates, + } + + return tp +} + +func (tp mockTargetProvider) Run(ctx context.Context, up chan<- []*config.TargetGroup) { + atomic.AddUint32(tp.callCount, 1) + tp.up = up + tp.sendUpdates() +} + +func (tp mockTargetProvider) sendUpdates() { + for _, update := range tp.updates { + + time.Sleep(update.interval * time.Millisecond) + + tgs := make([]*config.TargetGroup, len(update.targets)) + for i, tg := range update.targets { + tgs[i] = &config.TargetGroup{Source: tg} + } + + tp.up <- tgs } }