From 9fc5a547aeeb552f48a588f59c02eb76fc4d3262 Mon Sep 17 00:00:00 2001 From: James Ravn Date: Mon, 19 Jun 2017 16:47:29 +0100 Subject: [PATCH] Use endpoints informer for the endpoint controller This substantially reduces the number of API calls made by the endpoint controller. Currently the controller makes an API call per endpoint for each service that is synced. When the 30s resync is triggered, this results in an API call for every single endpoint in the cluster. This quickly exceeds the default qps/burst limit of 20/30 even in small clusters, leading to delays in endpoint updates. This change modifies the controller to use the endpoint informer cache for all endpoint GETs. This means we only make API calls for changes in endpoints. As a result, qps only depends on the pod activity in the cluster, rather than the number of services. --- cmd/kube-controller-manager/app/core.go | 1 + pkg/controller/endpoint/BUILD | 2 + .../endpoint/endpoints_controller.go | 39 +- .../endpoint/endpoints_controller_test.go | 339 ++++++++++-------- 4 files changed, 231 insertions(+), 150 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index f043dafe56..f7aeb1bece 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -178,6 +178,7 @@ func startEndpointController(ctx ControllerContext) (bool, error) { go endpointcontroller.NewEndpointController( ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Services(), + ctx.InformerFactory.Core().V1().Endpoints(), ctx.ClientBuilder.ClientOrDie("endpoint-controller"), ).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop) return true, nil diff --git a/pkg/controller/endpoint/BUILD b/pkg/controller/endpoint/BUILD index 583bb1c2c7..73c8831871 100644 --- a/pkg/controller/endpoint/BUILD +++ b/pkg/controller/endpoint/BUILD @@ -16,6 +16,7 @@ go_library( ], tags = ["automanaged"], deps = [ + "//pkg/api:go_default_library", "//pkg/api/v1/endpoints:go_default_library", "//pkg/api/v1/pod:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", @@ -53,6 +54,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/util/testing:go_default_library", diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 589698fea7..68c2c1378e 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1/endpoints" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -69,13 +70,15 @@ var ( ) // NewEndpointController returns a new *EndpointController. -func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, client clientset.Interface) *EndpointController { +func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, + endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface) *EndpointController { if client != nil && client.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().RESTClient().GetRateLimiter()) } e := &EndpointController{ - client: client, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"), + client: client, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"), + workerLoopPeriod: time.Second, } serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -96,6 +99,9 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme e.podLister = podInformer.Lister() e.podsSynced = podInformer.Informer().HasSynced + e.endpointsLister = endpointsInformer.Lister() + e.endpointsSynced = endpointsInformer.Informer().HasSynced + return e } @@ -117,12 +123,22 @@ type EndpointController struct { // Added as a member to the struct to allow injection for testing. podsSynced cache.InformerSynced + // endpointsLister is able to list/get endpoints and is populated by the shared informer passed to + // NewEndpointController. + endpointsLister corelisters.EndpointsLister + // endpointsSynced returns true if the endpoints shared informer has been synced at least once. + // Added as a member to the struct to allow injection for testing. + endpointsSynced cache.InformerSynced + // Services that need to be updated. A channel is inappropriate here, // because it allows services with lots of pods to be serviced much // more often than services with few pods; it also would cause a // service that's inserted multiple times to be processed more than // necessary. queue workqueue.RateLimitingInterface + + // workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes. + workerLoopPeriod time.Duration } // Runs e; will not return until stopCh is closed. workers determines how many @@ -134,12 +150,12 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Starting endpoint controller") defer glog.Infof("Shutting down endpoint controller") - if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced) { + if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(e.worker, time.Second, stopCh) + go wait.Until(e.worker, e.workerLoopPeriod, stopCh) } go func() { @@ -413,7 +429,7 @@ func (e *EndpointController) syncService(key string) error { subsets = endpoints.RepackSubsets(subsets) // See if there's actually an update here. - currentEndpoints, err := e.client.Core().Endpoints(service.Namespace).Get(service.Name, metav1.GetOptions{}) + currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name) if err != nil { if errors.IsNotFound(err) { currentEndpoints = &v1.Endpoints{ @@ -432,7 +448,11 @@ func (e *EndpointController) syncService(key string) error { glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) return nil } - newEndpoints := currentEndpoints + copy, err := api.Scheme.DeepCopy(currentEndpoints) + if err != nil { + return err + } + newEndpoints := copy.(*v1.Endpoints) newEndpoints.Subsets = subsets newEndpoints.Labels = service.Labels if newEndpoints.Annotations == nil { @@ -468,13 +488,12 @@ func (e *EndpointController) syncService(key string) error { // some stragglers could have been left behind if the endpoint controller // reboots). func (e *EndpointController) checkLeftoverEndpoints() { - list, err := e.client.Core().Endpoints(metav1.NamespaceAll).List(metav1.ListOptions{}) + list, err := e.endpointsLister.List(labels.Everything()) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)) return } - for i := range list.Items { - ep := &list.Items[i] + for _, ep := range list { if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok { // when there are multiple controller-manager instances, // we observe that it will delete leader-election endpoints after 5min diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 061668dfd8..bd3a81a60b 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -21,11 +21,13 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" utiltesting "k8s.io/client-go/util/testing" @@ -38,6 +40,7 @@ import ( ) var alwaysReady = func() bool { return true } +var neverReady = func() bool { return false } var emptyNodeName string func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) { @@ -78,10 +81,10 @@ type serverResponse struct { obj interface{} } -func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResponse) (*httptest.Server, *utiltesting.FakeHandler) { +func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) { fakeEndpointsHandler := utiltesting.FakeHandler{ - StatusCode: endpointsResponse.statusCode, - ResponseBody: runtime.EncodeOrDie(testapi.Default.Codec(), endpointsResponse.obj.(runtime.Object)), + StatusCode: http.StatusOK, + ResponseBody: runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{}), } mux := http.NewServeMux() mux.Handle(testapi.Default.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler) @@ -95,39 +98,43 @@ func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResp type endpointController struct { *EndpointController - podStore cache.Store - serviceStore cache.Store + podStore cache.Store + serviceStore cache.Store + endpointsStore cache.Store } func newController(url string) *endpointController { client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(), client) + endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(), + informerFactory.Core().V1().Endpoints(), client) endpoints.podsSynced = alwaysReady endpoints.servicesSynced = alwaysReady + endpoints.endpointsSynced = alwaysReady return &endpointController{ endpoints, informerFactory.Core().V1().Pods().Informer().GetStore(), informerFactory.Core().V1().Services().Informer().GetStore(), + informerFactory.Core().V1().Endpoints().Informer().GetStore(), } } func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { ns := metav1.NamespaceDefault - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000}}, + }}, + }) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}}, @@ -138,29 +145,21 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { func TestCheckLeftoverEndpoints(t *testing.T) { ns := metav1.NamespaceDefault - // Note that this requests *all* endpoints, therefore metav1.NamespaceAll - // below. - testServer, _ := makeTestServer(t, metav1.NamespaceAll, - serverResponse{http.StatusOK, &v1.EndpointsList{ - ListMeta: metav1.ListMeta{ - ResourceVersion: "1", - }, - Items: []v1.Endpoints{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000}}, - }}, - }}, - }}) + testServer, _ := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000}}, + }}, + }) endpoints.checkLeftoverEndpoints() - if e, a := 1, endpoints.queue.Len(); e != a { t.Fatalf("Expected %v, got %v", e, a) } @@ -172,21 +171,20 @@ func TestCheckLeftoverEndpoints(t *testing.T) { func TestSyncEndpointsProtocolTCP(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) - + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, + }}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -196,7 +194,8 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") - endpointsHandler.ValidateRequestCount(t, 2) + + endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -213,20 +212,20 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { func TestSyncEndpointsProtocolUDP(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}}, + }}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -236,7 +235,8 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") - endpointsHandler.ValidateRequestCount(t, 2) + + endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -253,17 +253,17 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -273,6 +273,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -289,17 +290,17 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{}, + }) addPods(endpoints.podStore, ns, 0, 1, 1) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -309,6 +310,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -325,17 +327,17 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{}, + }) addPods(endpoints.podStore, ns, 1, 1, 1) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -345,6 +347,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -362,20 +365,20 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { func TestSyncEndpointsItemsPreexisting(t *testing.T) { ns := "bar" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000}}, + }}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -385,6 +388,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -401,20 +405,20 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { ns := metav1.NamespaceDefault - testServer, endpointsHandler := makeTestServer(t, metav1.NamespaceDefault, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "1", - Name: "foo", - Namespace: ns, - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, - Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + Name: "foo", + Namespace: ns, + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, + Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }) addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault}, @@ -424,13 +428,12 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") - endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", metav1.NamespaceDefault, "foo"), "GET", nil) + endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsItems(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{}}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) addPods(endpoints.podStore, ns, 3, 2, 0) @@ -446,6 +449,7 @@ func TestSyncEndpointsItems(t *testing.T) { }, }) endpoints.syncService("other/foo") + expectedSubsets := []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{ {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, @@ -460,18 +464,17 @@ func TestSyncEndpointsItems(t *testing.T) { data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", + Name: "foo", }, Subsets: endptspkg.SortSubsets(expectedSubsets), }) - // endpointsHandler should get 2 requests - one for "GET" and the next for "POST". - endpointsHandler.ValidateRequestCount(t, 2) + endpointsHandler.ValidateRequestCount(t, 1) endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data) } func TestSyncEndpointsItemsWithLabels(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{}}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) addPods(endpoints.podStore, ns, 3, 2, 0) @@ -491,6 +494,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + expectedSubsets := []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{ {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, @@ -505,34 +509,34 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", + Name: "foo", Labels: serviceLabels, }, Subsets: endptspkg.SortSubsets(expectedSubsets), }) - // endpointsHandler should get 2 requests - one for "GET" and the next for "POST". - endpointsHandler.ValidateRequestCount(t, 2) + endpointsHandler.ValidateRequestCount(t, 1) endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data) } func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { ns := "bar" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - Labels: map[string]string{ - "foo": "bar", - }, - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + Labels: map[string]string{ + "foo": "bar", + }, + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000}}, + }}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) serviceLabels := map[string]string{"baz": "blah"} endpoints.serviceStore.Add(&v1.Service{ @@ -547,6 +551,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -561,3 +566,57 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { }) endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) } + +func TestWaitsForAllInformersToBeSynced2(t *testing.T) { + var tests = []struct { + podsSynced func() bool + servicesSynced func() bool + endpointsSynced func() bool + shouldUpdateEndpoints bool + }{ + {neverReady, alwaysReady, alwaysReady, false}, + {alwaysReady, neverReady, alwaysReady, false}, + {alwaysReady, alwaysReady, neverReady, false}, + {alwaysReady, alwaysReady, alwaysReady, true}, + } + + for _, test := range tests { + func() { + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL) + addPods(endpoints.podStore, ns, 1, 1, 0) + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{}, + Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}}, + }, + } + endpoints.serviceStore.Add(service) + endpoints.enqueueService(service) + endpoints.podsSynced = test.podsSynced + endpoints.servicesSynced = test.servicesSynced + endpoints.endpointsSynced = test.endpointsSynced + endpoints.workerLoopPeriod = 10 * time.Millisecond + stopCh := make(chan struct{}) + defer close(stopCh) + go endpoints.Run(1, stopCh) + + // cache.WaitForCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period. + // To ensure we get all updates, including unexpected ones, we need to wait at least as long as + // a single cache sync period and worker period, with some fudge room. + time.Sleep(150 * time.Millisecond) + if test.shouldUpdateEndpoints { + // Ensure the work queue has been processed by looping for up to a second to prevent flakes. + wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) { + return endpoints.queue.Len() == 0, nil + }) + endpointsHandler.ValidateRequestCount(t, 1) + } else { + endpointsHandler.ValidateRequestCount(t, 0) + } + }() + } +}