|
|
|
@ -667,7 +667,7 @@ func TestTargetUpdatesOrder(t *testing.T) {
|
|
|
|
|
|
|
|
|
|
provUpdates := make(chan []*targetgroup.Group) |
|
|
|
|
for _, up := range tc.updates { |
|
|
|
|
go newMockDiscoveryProvider(up).Run(ctx, provUpdates) |
|
|
|
|
go newMockDiscoveryProvider(up...).Run(ctx, provUpdates) |
|
|
|
|
if len(up) > 0 { |
|
|
|
|
totalUpdatesCount = totalUpdatesCount + len(up) |
|
|
|
|
} |
|
|
|
@ -885,31 +885,6 @@ scrape_configs:
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestCoordinationWithEmptyProvider(t *testing.T) { |
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
|
|
|
|
defer cancel() |
|
|
|
|
|
|
|
|
|
mgr := NewManager(ctx, nil) |
|
|
|
|
mgr.updatert = 100 * time.Millisecond |
|
|
|
|
go mgr.Run() |
|
|
|
|
|
|
|
|
|
p := emptyProvider{} |
|
|
|
|
mgr.StartCustomProvider(ctx, "empty", p) |
|
|
|
|
|
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
t.Fatal("no update received in the expected timeframe") |
|
|
|
|
case tgs, ok := <-mgr.SyncCh(): |
|
|
|
|
if !ok { |
|
|
|
|
t.Fatal("discovery manager channel is closed") |
|
|
|
|
} |
|
|
|
|
if len(tgs) != 0 { |
|
|
|
|
t.Fatalf("target groups mismatch, got: %#v, expected: {}\n", tgs) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestCoordinationWithReceiver(t *testing.T) { |
|
|
|
|
updateDelay := 100 * time.Millisecond |
|
|
|
|
|
|
|
|
@ -920,13 +895,76 @@ func TestCoordinationWithReceiver(t *testing.T) {
|
|
|
|
|
|
|
|
|
|
testCases := []struct { |
|
|
|
|
title string |
|
|
|
|
providers map[string][]update |
|
|
|
|
providers map[string]Discoverer |
|
|
|
|
expected []expect |
|
|
|
|
}{ |
|
|
|
|
{ |
|
|
|
|
title: "Receiver should get an empty map even when the provider sends nothing and closes the channel", |
|
|
|
|
providers: map[string]Discoverer{ |
|
|
|
|
"empty": &onceProvider{}, |
|
|
|
|
}, |
|
|
|
|
expected: []expect{ |
|
|
|
|
{ |
|
|
|
|
tgs: map[string][]*targetgroup.Group{}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
{ |
|
|
|
|
title: "Receiver should get updates even when one provider closes its channel", |
|
|
|
|
providers: map[string]Discoverer{ |
|
|
|
|
"once1": &onceProvider{ |
|
|
|
|
tgs: []*targetgroup.Group{ |
|
|
|
|
{ |
|
|
|
|
Source: "tg1", |
|
|
|
|
Targets: []model.LabelSet{{"__instance__": "1"}}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
"mock1": newMockDiscoveryProvider( |
|
|
|
|
update{ |
|
|
|
|
interval: 2 * updateDelay / time.Millisecond, |
|
|
|
|
targetGroups: []targetgroup.Group{ |
|
|
|
|
{ |
|
|
|
|
Source: "tg2", |
|
|
|
|
Targets: []model.LabelSet{{"__instance__": "2"}}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
), |
|
|
|
|
}, |
|
|
|
|
expected: []expect{ |
|
|
|
|
{ |
|
|
|
|
tgs: map[string][]*targetgroup.Group{ |
|
|
|
|
"once1": []*targetgroup.Group{ |
|
|
|
|
{ |
|
|
|
|
Source: "tg1", |
|
|
|
|
Targets: []model.LabelSet{{"__instance__": "1"}}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
{ |
|
|
|
|
tgs: map[string][]*targetgroup.Group{ |
|
|
|
|
"once1": []*targetgroup.Group{ |
|
|
|
|
{ |
|
|
|
|
Source: "tg1", |
|
|
|
|
Targets: []model.LabelSet{{"__instance__": "1"}}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
"mock1": []*targetgroup.Group{ |
|
|
|
|
{ |
|
|
|
|
Source: "tg2", |
|
|
|
|
Targets: []model.LabelSet{{"__instance__": "2"}}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
{ |
|
|
|
|
title: "Receiver should get updates even when the channel is blocked", |
|
|
|
|
providers: map[string][]update{ |
|
|
|
|
"mock1": []update{ |
|
|
|
|
providers: map[string]Discoverer{ |
|
|
|
|
"mock1": newMockDiscoveryProvider( |
|
|
|
|
update{ |
|
|
|
|
targetGroups: []targetgroup.Group{ |
|
|
|
|
{ |
|
|
|
@ -944,7 +982,7 @@ func TestCoordinationWithReceiver(t *testing.T) {
|
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
), |
|
|
|
|
}, |
|
|
|
|
expected: []expect{ |
|
|
|
|
{ |
|
|
|
@ -977,8 +1015,8 @@ func TestCoordinationWithReceiver(t *testing.T) {
|
|
|
|
|
}, |
|
|
|
|
{ |
|
|
|
|
title: "The receiver gets an update when a target group is gone", |
|
|
|
|
providers: map[string][]update{ |
|
|
|
|
"mock1": []update{ |
|
|
|
|
providers: map[string]Discoverer{ |
|
|
|
|
"mock1": newMockDiscoveryProvider( |
|
|
|
|
update{ |
|
|
|
|
targetGroups: []targetgroup.Group{ |
|
|
|
|
{ |
|
|
|
@ -996,7 +1034,7 @@ func TestCoordinationWithReceiver(t *testing.T) {
|
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
), |
|
|
|
|
}, |
|
|
|
|
expected: []expect{ |
|
|
|
|
{ |
|
|
|
@ -1023,8 +1061,8 @@ func TestCoordinationWithReceiver(t *testing.T) {
|
|
|
|
|
}, |
|
|
|
|
{ |
|
|
|
|
title: "The receiver gets merged updates", |
|
|
|
|
providers: map[string][]update{ |
|
|
|
|
"mock1": []update{ |
|
|
|
|
providers: map[string]Discoverer{ |
|
|
|
|
"mock1": newMockDiscoveryProvider( |
|
|
|
|
// This update should never be seen by the receiver because
|
|
|
|
|
// it is overwritten by the next one.
|
|
|
|
|
update{ |
|
|
|
@ -1043,7 +1081,7 @@ func TestCoordinationWithReceiver(t *testing.T) {
|
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
), |
|
|
|
|
}, |
|
|
|
|
expected: []expect{ |
|
|
|
|
{ |
|
|
|
@ -1060,8 +1098,8 @@ func TestCoordinationWithReceiver(t *testing.T) {
|
|
|
|
|
}, |
|
|
|
|
{ |
|
|
|
|
title: "Discovery with multiple providers", |
|
|
|
|
providers: map[string][]update{ |
|
|
|
|
"mock1": []update{ |
|
|
|
|
providers: map[string]Discoverer{ |
|
|
|
|
"mock1": newMockDiscoveryProvider( |
|
|
|
|
// This update is available in the first receive.
|
|
|
|
|
update{ |
|
|
|
|
targetGroups: []targetgroup.Group{ |
|
|
|
@ -1071,8 +1109,8 @@ func TestCoordinationWithReceiver(t *testing.T) {
|
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
"mock2": []update{ |
|
|
|
|
), |
|
|
|
|
"mock2": newMockDiscoveryProvider( |
|
|
|
|
// This update should only arrive after the receiver has read from the channel once.
|
|
|
|
|
update{ |
|
|
|
|
interval: 2 * updateDelay / time.Millisecond, |
|
|
|
@ -1082,8 +1120,7 @@ func TestCoordinationWithReceiver(t *testing.T) {
|
|
|
|
|
Targets: []model.LabelSet{{"__instance__": "2"}}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}), |
|
|
|
|
}, |
|
|
|
|
expected: []expect{ |
|
|
|
|
{ |
|
|
|
@ -1127,8 +1164,7 @@ func TestCoordinationWithReceiver(t *testing.T) {
|
|
|
|
|
mgr.updatert = updateDelay |
|
|
|
|
go mgr.Run() |
|
|
|
|
|
|
|
|
|
for name := range tc.providers { |
|
|
|
|
p := newMockDiscoveryProvider(tc.providers[name]) |
|
|
|
|
for name, p := range tc.providers { |
|
|
|
|
mgr.StartCustomProvider(ctx, name, p) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1136,10 +1172,10 @@ func TestCoordinationWithReceiver(t *testing.T) {
|
|
|
|
|
time.Sleep(expected.delay) |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
t.Fatal("no update received in the expected timeframe") |
|
|
|
|
t.Fatalf("step %d: no update received in the expected timeframe", i) |
|
|
|
|
case tgs, ok := <-mgr.SyncCh(): |
|
|
|
|
if !ok { |
|
|
|
|
t.Fatal("discovery manager channel is closed") |
|
|
|
|
t.Fatalf("step %d: discovery manager channel is closed", i) |
|
|
|
|
} |
|
|
|
|
if len(tgs) != len(expected.tgs) { |
|
|
|
|
t.Fatalf("step %d: target groups mismatch, got: %d, expected: %d\ngot: %#v\nexpected: %#v", |
|
|
|
@ -1147,7 +1183,7 @@ func TestCoordinationWithReceiver(t *testing.T) {
|
|
|
|
|
} |
|
|
|
|
for k := range expected.tgs { |
|
|
|
|
if _, ok := tgs[k]; !ok { |
|
|
|
|
t.Fatalf("step %d: target group not found: %s", i, k) |
|
|
|
|
t.Fatalf("step %d: target group not found: %s\ngot: %#v", i, k, tgs) |
|
|
|
|
} |
|
|
|
|
assertEqualGroups(t, tgs[k], expected.tgs[k], func(got, expected string) string { |
|
|
|
|
return fmt.Sprintf("step %d: targets mismatch \ngot: %q \nexpected: %q", i, got, expected) |
|
|
|
@ -1168,7 +1204,7 @@ type mockdiscoveryProvider struct {
|
|
|
|
|
updates []update |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newMockDiscoveryProvider(updates []update) mockdiscoveryProvider { |
|
|
|
|
func newMockDiscoveryProvider(updates ...update) mockdiscoveryProvider { |
|
|
|
|
tp := mockdiscoveryProvider{ |
|
|
|
|
updates: updates, |
|
|
|
|
} |
|
|
|
@ -1206,9 +1242,14 @@ func (a byGroupSource) Len() int { return len(a) }
|
|
|
|
|
func (a byGroupSource) Swap(i, j int) { a[i], a[j] = a[j], a[i] } |
|
|
|
|
func (a byGroupSource) Less(i, j int) bool { return a[i].Source < a[j].Source } |
|
|
|
|
|
|
|
|
|
// emptyProvider sends no updates and closes the update channel.
|
|
|
|
|
type emptyProvider struct{} |
|
|
|
|
// onceProvider sends updates once (if any) and closes the update channel.
|
|
|
|
|
type onceProvider struct { |
|
|
|
|
tgs []*targetgroup.Group |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e emptyProvider) Run(_ context.Context, ch chan<- []*targetgroup.Group) { |
|
|
|
|
func (o onceProvider) Run(_ context.Context, ch chan<- []*targetgroup.Group) { |
|
|
|
|
if len(o.tgs) > 0 { |
|
|
|
|
ch <- o.tgs |
|
|
|
|
} |
|
|
|
|
close(ch) |
|
|
|
|
} |
|
|
|
|