Merge pull request #13593 from machine424/no-index

discovery: kubernetes: Avoid creating unnecessary Kubernetes indexers…
pull/13655/head
Björn Rabenstein 9 months ago committed by GitHub
commit dbb4055633
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -18,6 +18,7 @@ import (
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/discovery/v1"
"k8s.io/api/discovery/v1beta1"
@ -1405,3 +1406,41 @@ func TestEndpointSliceDiscoveryEmptyPodStatus(t *testing.T) {
expectedRes: map[string]*targetgroup.Group{},
}.Run(t)
}
// TestEndpointSliceInfIndexersCount makes sure that RoleEndpointSlice discovery
// sets up indexing for the main Kube informer only when needed.
// See: https://github.com/prometheus/prometheus/pull/13554#discussion_r1490965817
func TestEndpointSliceInfIndexersCount(t *testing.T) {
tests := []struct {
name string
withNodeMetadata bool
}{
{"with node metadata", true},
{"without node metadata", false},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var (
n *Discovery
mainInfIndexersCount int
)
if tc.withNodeMetadata {
mainInfIndexersCount = 1
n, _ = makeDiscoveryWithMetadata(RoleEndpointSlice, NamespaceDiscovery{}, AttachMetadataConfig{Node: true})
} else {
n, _ = makeDiscovery(RoleEndpointSlice, NamespaceDiscovery{})
}
k8sDiscoveryTest{
discovery: n,
afterStart: func() {
n.RLock()
defer n.RUnlock()
require.Len(t, n.discoverers, 1)
require.Len(t, n.discoverers[0].(*EndpointSlice).endpointSliceInf.GetIndexer().GetIndexers(), mainInfIndexersCount)
},
}.Run(t)
})
}
}

@ -815,7 +815,7 @@ func (d *Discovery) newEndpointsByNodeInformer(plw *cache.ListWatch) cache.Share
func (d *Discovery) newEndpointSlicesByNodeInformer(plw *cache.ListWatch, object runtime.Object) cache.SharedIndexInformer {
indexers := make(map[string]cache.IndexFunc)
if !d.attachMetadata.Node {
cache.NewSharedIndexInformer(plw, &disv1.EndpointSlice{}, resyncDisabled, indexers)
return cache.NewSharedIndexInformer(plw, object, resyncDisabled, indexers)
}
indexers[nodeIndex] = func(obj interface{}) ([]string, error) {

@ -128,7 +128,7 @@ func (d k8sDiscoveryTest) Run(t *testing.T) {
}
resChan := make(chan map[string]*targetgroup.Group)
go readResultWithTimeout(t, ch, d.expectedMaxItems, time.Second, resChan)
go readResultWithTimeout(t, ctx, ch, d.expectedMaxItems, time.Second, resChan)
dd, ok := d.discovery.(hasSynced)
require.True(t, ok, "discoverer does not implement hasSynced interface")
@ -141,13 +141,18 @@ func (d k8sDiscoveryTest) Run(t *testing.T) {
if d.expectedRes != nil {
res := <-resChan
requireTargetGroups(t, d.expectedRes, res)
} else {
// Stop readResultWithTimeout and wait for it.
cancel()
<-resChan
}
}
// readResultWithTimeout reads all targetgroups from channel with timeout.
// It merges targetgroups by source and sends the result to result channel.
func readResultWithTimeout(t *testing.T, ch <-chan []*targetgroup.Group, max int, timeout time.Duration, resChan chan<- map[string]*targetgroup.Group) {
func readResultWithTimeout(t *testing.T, ctx context.Context, ch <-chan []*targetgroup.Group, max int, stopAfter time.Duration, resChan chan<- map[string]*targetgroup.Group) {
res := make(map[string]*targetgroup.Group)
timeout := time.After(stopAfter)
Loop:
for {
select {
@ -162,12 +167,15 @@ Loop:
// Reached max target groups we may get, break fast.
break Loop
}
case <-time.After(timeout):
case <-timeout:
// Because we use queue, an object that is created then
// deleted or updated may be processed only once.
// So possibly we may skip events, timed out here.
t.Logf("timed out, got %d (max: %d) items, some events are skipped", len(res), max)
break Loop
case <-ctx.Done():
t.Logf("stopped, got %d (max: %d) items", len(res), max)
break Loop
}
}

Loading…
Cancel
Save