Merge pull request #47731 from jsravn/use-endpoints-cache-for-endpoint-controller

Automatic merge from submit-queue

Use endpoints informer for the endpoint controller

This substantially reduces the number of API calls made by the endpoint
controller. Currently the controller makes an API call per endpoint for
each service that is synced. When the 30s resync is triggered, this
results in an API call for every single endpoint in the cluster. This
quickly exceeds the default qps/burst limit of 20/30 even in small
clusters, leading to delays in endpoint updates.

This change modifies the controller to use the endpoint informer cache
for all endpoint GETs. This means we only make API calls for changes in
endpoints. As a result, qps only depends on the pod activity in the
cluster, rather than the number of services.



**What this PR does / why we need it**:

Address endpoint update delays as described in https://github.com/kubernetes/kubernetes/issues/47597.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #

https://github.com/kubernetes/kubernetes/issues/47597

**Special notes for your reviewer**:

**Release note**:

```release-note
```
pull/6/head
Kubernetes Submit Queue 2017-06-27 05:20:12 -07:00 committed by GitHub
commit 6d1da16456
4 changed files with 231 additions and 150 deletions

View File

@ -178,6 +178,7 @@ func startEndpointController(ctx ControllerContext) (bool, error) {
go endpointcontroller.NewEndpointController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Services(),
ctx.InformerFactory.Core().V1().Endpoints(),
ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop)
return true, nil

View File

@ -16,6 +16,7 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1/endpoints:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
@ -53,6 +54,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library",

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1/endpoints"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -69,13 +70,15 @@ var (
)
// NewEndpointController returns a new *EndpointController.
func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, client clientset.Interface) *EndpointController {
func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface) *EndpointController {
if client != nil && client.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().RESTClient().GetRateLimiter())
}
e := &EndpointController{
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
workerLoopPeriod: time.Second,
}
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -96,6 +99,9 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
e.podLister = podInformer.Lister()
e.podsSynced = podInformer.Informer().HasSynced
e.endpointsLister = endpointsInformer.Lister()
e.endpointsSynced = endpointsInformer.Informer().HasSynced
return e
}
@ -117,12 +123,22 @@ type EndpointController struct {
// Added as a member to the struct to allow injection for testing.
podsSynced cache.InformerSynced
// endpointsLister is able to list/get endpoints and is populated by the shared informer passed to
// NewEndpointController.
endpointsLister corelisters.EndpointsLister
// endpointsSynced returns true if the endpoints shared informer has been synced at least once.
// Added as a member to the struct to allow injection for testing.
endpointsSynced cache.InformerSynced
// Services that need to be updated. A channel is inappropriate here,
// because it allows services with lots of pods to be serviced much
// more often than services with few pods; it also would cause a
// service that's inserted multiple times to be processed more than
// necessary.
queue workqueue.RateLimitingInterface
// workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes.
workerLoopPeriod time.Duration
}
// Runs e; will not return until stopCh is closed. workers determines how many
@ -134,12 +150,12 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Starting endpoint controller")
defer glog.Infof("Shutting down endpoint controller")
if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced) {
if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(e.worker, time.Second, stopCh)
go wait.Until(e.worker, e.workerLoopPeriod, stopCh)
}
go func() {
@ -413,7 +429,7 @@ func (e *EndpointController) syncService(key string) error {
subsets = endpoints.RepackSubsets(subsets)
// See if there's actually an update here.
currentEndpoints, err := e.client.Core().Endpoints(service.Namespace).Get(service.Name, metav1.GetOptions{})
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
if errors.IsNotFound(err) {
currentEndpoints = &v1.Endpoints{
@ -432,7 +448,11 @@ func (e *EndpointController) syncService(key string) error {
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return nil
}
newEndpoints := currentEndpoints
copy, err := api.Scheme.DeepCopy(currentEndpoints)
if err != nil {
return err
}
newEndpoints := copy.(*v1.Endpoints)
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
if newEndpoints.Annotations == nil {
@ -468,13 +488,12 @@ func (e *EndpointController) syncService(key string) error {
// some stragglers could have been left behind if the endpoint controller
// reboots).
func (e *EndpointController) checkLeftoverEndpoints() {
list, err := e.client.Core().Endpoints(metav1.NamespaceAll).List(metav1.ListOptions{})
list, err := e.endpointsLister.List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))
return
}
for i := range list.Items {
ep := &list.Items[i]
for _, ep := range list {
if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok {
// when there are multiple controller-manager instances,
// we observe that it will delete leader-election endpoints after 5min

View File

@ -21,11 +21,13 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
utiltesting "k8s.io/client-go/util/testing"
@ -38,6 +40,7 @@ import (
)
var alwaysReady = func() bool { return true }
var neverReady = func() bool { return false }
var emptyNodeName string
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
@ -78,10 +81,10 @@ type serverResponse struct {
obj interface{}
}
func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResponse) (*httptest.Server, *utiltesting.FakeHandler) {
func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltesting.FakeHandler) {
fakeEndpointsHandler := utiltesting.FakeHandler{
StatusCode: endpointsResponse.statusCode,
ResponseBody: runtime.EncodeOrDie(testapi.Default.Codec(), endpointsResponse.obj.(runtime.Object)),
StatusCode: http.StatusOK,
ResponseBody: runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{}),
}
mux := http.NewServeMux()
mux.Handle(testapi.Default.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler)
@ -97,25 +100,31 @@ type endpointController struct {
*EndpointController
podStore cache.Store
serviceStore cache.Store
endpointsStore cache.Store
}
func newController(url string) *endpointController {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(), client)
endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(),
informerFactory.Core().V1().Endpoints(), client)
endpoints.podsSynced = alwaysReady
endpoints.servicesSynced = alwaysReady
endpoints.endpointsSynced = alwaysReady
return &endpointController{
endpoints,
informerFactory.Core().V1().Pods().Informer().GetStore(),
informerFactory.Core().V1().Services().Informer().GetStore(),
informerFactory.Core().V1().Endpoints().Informer().GetStore(),
}
}
func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns,
serverResponse{http.StatusOK, &v1.Endpoints{
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
@ -125,9 +134,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000}},
}},
}})
defer testServer.Close()
endpoints := newController(testServer.URL)
})
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}},
@ -138,14 +145,10 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
func TestCheckLeftoverEndpoints(t *testing.T) {
ns := metav1.NamespaceDefault
// Note that this requests *all* endpoints, therefore metav1.NamespaceAll
// below.
testServer, _ := makeTestServer(t, metav1.NamespaceAll,
serverResponse{http.StatusOK, &v1.EndpointsList{
ListMeta: metav1.ListMeta{
ResourceVersion: "1",
},
Items: []v1.Endpoints{{
testServer, _ := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
@ -155,12 +158,8 @@ func TestCheckLeftoverEndpoints(t *testing.T) {
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000}},
}},
}},
}})
defer testServer.Close()
endpoints := newController(testServer.URL)
})
endpoints.checkLeftoverEndpoints()
if e, a := 1, endpoints.queue.Len(); e != a {
t.Fatalf("Expected %v, got %v", e, a)
}
@ -172,8 +171,10 @@ func TestCheckLeftoverEndpoints(t *testing.T) {
func TestSyncEndpointsProtocolTCP(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns,
serverResponse{http.StatusOK, &v1.Endpoints{
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
@ -183,10 +184,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
}},
}})
defer testServer.Close()
endpoints := newController(testServer.URL)
})
addPods(endpoints.podStore, ns, 1, 1, 0)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
@ -196,7 +194,8 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
},
})
endpoints.syncService(ns + "/foo")
endpointsHandler.ValidateRequestCount(t, 2)
endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@ -213,8 +212,10 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
func TestSyncEndpointsProtocolUDP(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns,
serverResponse{http.StatusOK, &v1.Endpoints{
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
@ -224,9 +225,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}},
}},
}})
defer testServer.Close()
endpoints := newController(testServer.URL)
})
addPods(endpoints.podStore, ns, 1, 1, 0)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
@ -236,7 +235,8 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
},
})
endpoints.syncService(ns + "/foo")
endpointsHandler.ValidateRequestCount(t, 2)
endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@ -253,17 +253,17 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns,
serverResponse{http.StatusOK, &v1.Endpoints{
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{},
}})
defer testServer.Close()
endpoints := newController(testServer.URL)
})
addPods(endpoints.podStore, ns, 1, 1, 0)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
@ -273,6 +273,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@ -289,17 +290,17 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns,
serverResponse{http.StatusOK, &v1.Endpoints{
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{},
}})
defer testServer.Close()
endpoints := newController(testServer.URL)
})
addPods(endpoints.podStore, ns, 0, 1, 1)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
@ -309,6 +310,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@ -325,17 +327,17 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns,
serverResponse{http.StatusOK, &v1.Endpoints{
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{},
}})
defer testServer.Close()
endpoints := newController(testServer.URL)
})
addPods(endpoints.podStore, ns, 1, 1, 1)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
@ -345,6 +347,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@ -362,8 +365,10 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
func TestSyncEndpointsItemsPreexisting(t *testing.T) {
ns := "bar"
testServer, endpointsHandler := makeTestServer(t, ns,
serverResponse{http.StatusOK, &v1.Endpoints{
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
@ -373,9 +378,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000}},
}},
}})
defer testServer.Close()
endpoints := newController(testServer.URL)
})
addPods(endpoints.podStore, ns, 1, 1, 0)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
@ -385,6 +388,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@ -401,8 +405,10 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, metav1.NamespaceDefault,
serverResponse{http.StatusOK, &v1.Endpoints{
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
Name: "foo",
@ -412,9 +418,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}},
}})
defer testServer.Close()
endpoints := newController(testServer.URL)
})
addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault},
@ -424,13 +428,12 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
},
})
endpoints.syncService(ns + "/foo")
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", metav1.NamespaceDefault, "foo"), "GET", nil)
endpointsHandler.ValidateRequestCount(t, 0)
}
func TestSyncEndpointsItems(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns,
serverResponse{http.StatusOK, &v1.Endpoints{}})
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
addPods(endpoints.podStore, ns, 3, 2, 0)
@ -446,6 +449,7 @@ func TestSyncEndpointsItems(t *testing.T) {
},
})
endpoints.syncService("other/foo")
expectedSubsets := []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
@ -460,18 +464,17 @@ func TestSyncEndpointsItems(t *testing.T) {
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "",
Name: "foo",
},
Subsets: endptspkg.SortSubsets(expectedSubsets),
})
// endpointsHandler should get 2 requests - one for "GET" and the next for "POST".
endpointsHandler.ValidateRequestCount(t, 2)
endpointsHandler.ValidateRequestCount(t, 1)
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
}
func TestSyncEndpointsItemsWithLabels(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns,
serverResponse{http.StatusOK, &v1.Endpoints{}})
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
addPods(endpoints.podStore, ns, 3, 2, 0)
@ -491,6 +494,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
},
})
endpoints.syncService(ns + "/foo")
expectedSubsets := []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
@ -505,19 +509,21 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "",
Name: "foo",
Labels: serviceLabels,
},
Subsets: endptspkg.SortSubsets(expectedSubsets),
})
// endpointsHandler should get 2 requests - one for "GET" and the next for "POST".
endpointsHandler.ValidateRequestCount(t, 2)
endpointsHandler.ValidateRequestCount(t, 1)
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
}
func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
ns := "bar"
testServer, endpointsHandler := makeTestServer(t, ns,
serverResponse{http.StatusOK, &v1.Endpoints{
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
@ -530,9 +536,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000}},
}},
}})
defer testServer.Close()
endpoints := newController(testServer.URL)
})
addPods(endpoints.podStore, ns, 1, 1, 0)
serviceLabels := map[string]string{"baz": "blah"}
endpoints.serviceStore.Add(&v1.Service{
@ -547,6 +551,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@ -561,3 +566,57 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
})
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
}
func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
var tests = []struct {
podsSynced func() bool
servicesSynced func() bool
endpointsSynced func() bool
shouldUpdateEndpoints bool
}{
{neverReady, alwaysReady, alwaysReady, false},
{alwaysReady, neverReady, alwaysReady, false},
{alwaysReady, alwaysReady, neverReady, false},
{alwaysReady, alwaysReady, alwaysReady, true},
}
for _, test := range tests {
func() {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
addPods(endpoints.podStore, ns, 1, 1, 0)
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{},
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
},
}
endpoints.serviceStore.Add(service)
endpoints.enqueueService(service)
endpoints.podsSynced = test.podsSynced
endpoints.servicesSynced = test.servicesSynced
endpoints.endpointsSynced = test.endpointsSynced
endpoints.workerLoopPeriod = 10 * time.Millisecond
stopCh := make(chan struct{})
defer close(stopCh)
go endpoints.Run(1, stopCh)
// cache.WaitForCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period.
// To ensure we get all updates, including unexpected ones, we need to wait at least as long as
// a single cache sync period and worker period, with some fudge room.
time.Sleep(150 * time.Millisecond)
if test.shouldUpdateEndpoints {
// Ensure the work queue has been processed by looping for up to a second to prevent flakes.
wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
return endpoints.queue.Len() == 0, nil
})
endpointsHandler.ValidateRequestCount(t, 1)
} else {
endpointsHandler.ValidateRequestCount(t, 0)
}
}()
}
}