diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index f043dafe56..f7aeb1bece 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -178,6 +178,7 @@ func startEndpointController(ctx ControllerContext) (bool, error) { go endpointcontroller.NewEndpointController( ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Core().V1().Services(), + ctx.InformerFactory.Core().V1().Endpoints(), ctx.ClientBuilder.ClientOrDie("endpoint-controller"), ).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop) return true, nil diff --git a/pkg/controller/endpoint/BUILD b/pkg/controller/endpoint/BUILD index 583bb1c2c7..73c8831871 100644 --- a/pkg/controller/endpoint/BUILD +++ b/pkg/controller/endpoint/BUILD @@ -16,6 +16,7 @@ go_library( ], tags = ["automanaged"], deps = [ + "//pkg/api:go_default_library", "//pkg/api/v1/endpoints:go_default_library", "//pkg/api/v1/pod:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", @@ -53,6 +54,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/util/testing:go_default_library", diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 589698fea7..68c2c1378e 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1/endpoints" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -69,13 +70,15 @@ var ( ) // NewEndpointController returns a new *EndpointController. -func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, client clientset.Interface) *EndpointController { +func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, + endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface) *EndpointController { if client != nil && client.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().RESTClient().GetRateLimiter()) } e := &EndpointController{ - client: client, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"), + client: client, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"), + workerLoopPeriod: time.Second, } serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -96,6 +99,9 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme e.podLister = podInformer.Lister() e.podsSynced = podInformer.Informer().HasSynced + e.endpointsLister = endpointsInformer.Lister() + e.endpointsSynced = endpointsInformer.Informer().HasSynced + return e } @@ -117,12 +123,22 @@ type EndpointController struct { // Added as a member to the struct to allow injection for testing. podsSynced cache.InformerSynced + // endpointsLister is able to list/get endpoints and is populated by the shared informer passed to + // NewEndpointController. + endpointsLister corelisters.EndpointsLister + // endpointsSynced returns true if the endpoints shared informer has been synced at least once. + // Added as a member to the struct to allow injection for testing. + endpointsSynced cache.InformerSynced + // Services that need to be updated. A channel is inappropriate here, // because it allows services with lots of pods to be serviced much // more often than services with few pods; it also would cause a // service that's inserted multiple times to be processed more than // necessary. queue workqueue.RateLimitingInterface + + // workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes. + workerLoopPeriod time.Duration } // Runs e; will not return until stopCh is closed. workers determines how many @@ -134,12 +150,12 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Starting endpoint controller") defer glog.Infof("Shutting down endpoint controller") - if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced) { + if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(e.worker, time.Second, stopCh) + go wait.Until(e.worker, e.workerLoopPeriod, stopCh) } go func() { @@ -413,7 +429,7 @@ func (e *EndpointController) syncService(key string) error { subsets = endpoints.RepackSubsets(subsets) // See if there's actually an update here. - currentEndpoints, err := e.client.Core().Endpoints(service.Namespace).Get(service.Name, metav1.GetOptions{}) + currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name) if err != nil { if errors.IsNotFound(err) { currentEndpoints = &v1.Endpoints{ @@ -432,7 +448,11 @@ func (e *EndpointController) syncService(key string) error { glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) return nil } - newEndpoints := currentEndpoints + copy, err := api.Scheme.DeepCopy(currentEndpoints) + if err != nil { + return err + } + newEndpoints := copy.(*v1.Endpoints) newEndpoints.Subsets = subsets newEndpoints.Labels = service.Labels if newEndpoints.Annotations == nil { @@ -468,13 +488,12 @@ func (e *EndpointController) syncService(key string) error { // some stragglers could have been left behind if the endpoint controller // reboots). func (e *EndpointController) checkLeftoverEndpoints() { - list, err := e.client.Core().Endpoints(metav1.NamespaceAll).List(metav1.ListOptions{}) + list, err := e.endpointsLister.List(labels.Everything()) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)) return } - for i := range list.Items { - ep := &list.Items[i] + for _, ep := range list { if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok { // when there are multiple controller-manager instances, // we observe that it will delete leader-election endpoints after 5min diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 061668dfd8..bd3a81a60b 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -21,11 +21,13 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" utiltesting "k8s.io/client-go/util/testing" @@ -38,6 +40,7 @@ import ( ) var alwaysReady = func() bool { return true } +var neverReady = func() bool { return false } var emptyNodeName string func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) { @@ -78,10 +81,10 @@ type serverResponse struct { obj interface{} } -func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResponse) (*httptest.Server, *utiltesting.FakeHandler) { +func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) { fakeEndpointsHandler := utiltesting.FakeHandler{ - StatusCode: endpointsResponse.statusCode, - ResponseBody: runtime.EncodeOrDie(testapi.Default.Codec(), endpointsResponse.obj.(runtime.Object)), + StatusCode: http.StatusOK, + ResponseBody: runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{}), } mux := http.NewServeMux() mux.Handle(testapi.Default.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler) @@ -95,39 +98,43 @@ func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResp type endpointController struct { *EndpointController - podStore cache.Store - serviceStore cache.Store + podStore cache.Store + serviceStore cache.Store + endpointsStore cache.Store } func newController(url string) *endpointController { client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(), client) + endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(), + informerFactory.Core().V1().Endpoints(), client) endpoints.podsSynced = alwaysReady endpoints.servicesSynced = alwaysReady + endpoints.endpointsSynced = alwaysReady return &endpointController{ endpoints, informerFactory.Core().V1().Pods().Informer().GetStore(), informerFactory.Core().V1().Services().Informer().GetStore(), + informerFactory.Core().V1().Endpoints().Informer().GetStore(), } } func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { ns := metav1.NamespaceDefault - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000}}, + }}, + }) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}}, @@ -138,29 +145,21 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { func TestCheckLeftoverEndpoints(t *testing.T) { ns := metav1.NamespaceDefault - // Note that this requests *all* endpoints, therefore metav1.NamespaceAll - // below. - testServer, _ := makeTestServer(t, metav1.NamespaceAll, - serverResponse{http.StatusOK, &v1.EndpointsList{ - ListMeta: metav1.ListMeta{ - ResourceVersion: "1", - }, - Items: []v1.Endpoints{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000}}, - }}, - }}, - }}) + testServer, _ := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000}}, + }}, + }) endpoints.checkLeftoverEndpoints() - if e, a := 1, endpoints.queue.Len(); e != a { t.Fatalf("Expected %v, got %v", e, a) } @@ -172,21 +171,20 @@ func TestCheckLeftoverEndpoints(t *testing.T) { func TestSyncEndpointsProtocolTCP(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) - + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, + }}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -196,7 +194,8 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") - endpointsHandler.ValidateRequestCount(t, 2) + + endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -213,20 +212,20 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { func TestSyncEndpointsProtocolUDP(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}}, + }}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -236,7 +235,8 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") - endpointsHandler.ValidateRequestCount(t, 2) + + endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -253,17 +253,17 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -273,6 +273,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -289,17 +290,17 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{}, + }) addPods(endpoints.podStore, ns, 0, 1, 1) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -309,6 +310,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -325,17 +327,17 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{}, + }) addPods(endpoints.podStore, ns, 1, 1, 1) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -345,6 +347,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -362,20 +365,20 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { func TestSyncEndpointsItemsPreexisting(t *testing.T) { ns := "bar" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000}}, + }}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -385,6 +388,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -401,20 +405,20 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { ns := metav1.NamespaceDefault - testServer, endpointsHandler := makeTestServer(t, metav1.NamespaceDefault, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "1", - Name: "foo", - Namespace: ns, - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, - Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + Name: "foo", + Namespace: ns, + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, + Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }) addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault}, @@ -424,13 +428,12 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") - endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", metav1.NamespaceDefault, "foo"), "GET", nil) + endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsItems(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{}}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) addPods(endpoints.podStore, ns, 3, 2, 0) @@ -446,6 +449,7 @@ func TestSyncEndpointsItems(t *testing.T) { }, }) endpoints.syncService("other/foo") + expectedSubsets := []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{ {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, @@ -460,18 +464,17 @@ func TestSyncEndpointsItems(t *testing.T) { data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", + Name: "foo", }, Subsets: endptspkg.SortSubsets(expectedSubsets), }) - // endpointsHandler should get 2 requests - one for "GET" and the next for "POST". - endpointsHandler.ValidateRequestCount(t, 2) + endpointsHandler.ValidateRequestCount(t, 1) endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data) } func TestSyncEndpointsItemsWithLabels(t *testing.T) { ns := "other" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{}}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) addPods(endpoints.podStore, ns, 3, 2, 0) @@ -491,6 +494,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + expectedSubsets := []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{ {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, @@ -505,34 +509,34 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", + Name: "foo", Labels: serviceLabels, }, Subsets: endptspkg.SortSubsets(expectedSubsets), }) - // endpointsHandler should get 2 requests - one for "GET" and the next for "POST". - endpointsHandler.ValidateRequestCount(t, 2) + endpointsHandler.ValidateRequestCount(t, 1) endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data) } func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { ns := "bar" - testServer, endpointsHandler := makeTestServer(t, ns, - serverResponse{http.StatusOK, &v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: ns, - ResourceVersion: "1", - Labels: map[string]string{ - "foo": "bar", - }, - }, - Subsets: []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, - Ports: []v1.EndpointPort{{Port: 1000}}, - }}, - }}) + testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + Labels: map[string]string{ + "foo": "bar", + }, + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, + Ports: []v1.EndpointPort{{Port: 1000}}, + }}, + }) addPods(endpoints.podStore, ns, 1, 1, 0) serviceLabels := map[string]string{"baz": "blah"} endpoints.serviceStore.Add(&v1.Service{ @@ -547,6 +551,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { }, }) endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -561,3 +566,57 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { }) endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) } + +func TestWaitsForAllInformersToBeSynced2(t *testing.T) { + var tests = []struct { + podsSynced func() bool + servicesSynced func() bool + endpointsSynced func() bool + shouldUpdateEndpoints bool + }{ + {neverReady, alwaysReady, alwaysReady, false}, + {alwaysReady, neverReady, alwaysReady, false}, + {alwaysReady, alwaysReady, neverReady, false}, + {alwaysReady, alwaysReady, alwaysReady, true}, + } + + for _, test := range tests { + func() { + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL) + addPods(endpoints.podStore, ns, 1, 1, 0) + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{}, + Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}}, + }, + } + endpoints.serviceStore.Add(service) + endpoints.enqueueService(service) + endpoints.podsSynced = test.podsSynced + endpoints.servicesSynced = test.servicesSynced + endpoints.endpointsSynced = test.endpointsSynced + endpoints.workerLoopPeriod = 10 * time.Millisecond + stopCh := make(chan struct{}) + defer close(stopCh) + go endpoints.Run(1, stopCh) + + // cache.WaitForCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period. + // To ensure we get all updates, including unexpected ones, we need to wait at least as long as + // a single cache sync period and worker period, with some fudge room. + time.Sleep(150 * time.Millisecond) + if test.shouldUpdateEndpoints { + // Ensure the work queue has been processed by looping for up to a second to prevent flakes. + wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) { + return endpoints.queue.Len() == 0, nil + }) + endpointsHandler.ValidateRequestCount(t, 1) + } else { + endpointsHandler.ValidateRequestCount(t, 0) + } + }() + } +}