From 54b65013493e9f1f8757925425ab4d13052b4fa5 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Tue, 12 May 2015 14:39:23 -0700 Subject: [PATCH] Check expectations before filtering through active pods. --- pkg/controller/controller_utils.go | 11 ++++++ pkg/controller/replication_controller.go | 10 +++-- pkg/controller/replication_controller_test.go | 37 +++++++++++++++++++ 3 files changed, 55 insertions(+), 3 deletions(-) diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 0231bc0735..d7ba7f8541 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -58,6 +58,17 @@ var expKeyFunc = func(obj interface{}) (string, error) { return "", fmt.Errorf("Could not find key for obj %#v", obj) } +// RCExpectationsManager is an interface that allows users to set and wait on expectations. +// Only abstracted out for testing. +type RCExpectationsManager interface { + GetExpectations(rc *api.ReplicationController) (*PodExpectations, bool, error) + SatisfiedExpectations(rc *api.ReplicationController) bool + ExpectCreations(rc *api.ReplicationController, adds int) error + ExpectDeletions(rc *api.ReplicationController, dels int) error + CreationObserved(rc *api.ReplicationController) + DeletionObserved(rc *api.ReplicationController) +} + // RCExpectations is a ttl cache mapping rcs to what they expect to see before being woken up for a sync. type RCExpectations struct { cache.Store diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index b23cd4882a..03a7f21266 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -81,7 +81,7 @@ type ReplicationManager struct { // To allow injection of syncReplicationController for testing. syncHandler func(rcKey string) error // A TTLCache of pod creates/deletes each rc expects to see - expectations *RCExpectations + expectations RCExpectationsManager // A store of controllers, populated by the rcController controllerStore cache.StoreToControllerLister // A store of pods, populated by the podController @@ -351,16 +351,20 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { } controller := *obj.(*api.ReplicationController) + // Check the expectations of the rc before counting active pods, otherwise a new pod can sneak in + // and update the expectations after we've retrieved active pods from the store. If a new pod enters + // the store after we've checked the expectation, the rc sync is just deferred till the next relist. + rcNeedsSync := rm.expectations.SatisfiedExpectations(&controller) podList, err := rm.podStore.Pods(controller.Namespace).List(labels.Set(controller.Spec.Selector).AsSelector()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) return err } + // TODO: Do this in a single pass, or use an index. filteredPods := filterActivePods(podList.Items) - - if rm.expectations.SatisfiedExpectations(&controller) { + if rcNeedsSync { rm.manageReplicas(filteredPods, &controller) } diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index a401f2295f..5a4f16142b 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -953,3 +953,40 @@ func TestControllerBurstReplicas(t *testing.T) { doTestControllerBurstReplicas(t, 5, 12) doTestControllerBurstReplicas(t, 3, 2) } + +type FakeRCExpectations struct { + *RCExpectations + satisfied bool + expSatisfied func() +} + +func (fe FakeRCExpectations) SatisfiedExpectations(rc *api.ReplicationController) bool { + fe.expSatisfied() + return fe.satisfied +} + +// TestRCSyncExpectations tests that a pod cannot sneak in between counting active pods +// and checking expectations. +func TestRCSyncExpectations(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) + fakePodControl := FakePodControl{} + manager := NewReplicationManager(client, 2) + manager.podControl = &fakePodControl + + controllerSpec := newReplicationController(2) + manager.controllerStore.Store.Add(controllerSpec) + pods := newPodList(nil, 2, api.PodPending, controllerSpec) + manager.podStore.Store.Add(&pods.Items[0]) + postExpectationsPod := pods.Items[1] + + manager.expectations = FakeRCExpectations{ + NewRCExpectations(), true, func() { + // If we check active pods before checking expectataions, the rc + // will create a new replica because it doesn't see this pod, but + // has fulfilled its expectations. + manager.podStore.Store.Add(&postExpectationsPod) + }, + } + manager.syncReplicationController(getKey(controllerSpec, t)) + validateSyncReplication(t, &fakePodControl, 0, 0) +}