diff --git a/pkg/client/testclient/fake_replication_controllers.go b/pkg/client/testclient/fake_replication_controllers.go index 6e673b0ddb..08bbb502b8 100644 --- a/pkg/client/testclient/fake_replication_controllers.go +++ b/pkg/client/testclient/fake_replication_controllers.go @@ -30,32 +30,41 @@ type FakeReplicationControllers struct { Namespace string } +const ( + GetControllerAction = "get-replicationController" + UpdateControllerAction = "update-replicationController" + WatchControllerAction = "watch-replicationController" + DeleteControllerAction = "delete-replicationController" + ListControllerAction = "list-replicationControllers" + CreateControllerAction = "create-replicationController" +) + func (c *FakeReplicationControllers) List(selector labels.Selector) (*api.ReplicationControllerList, error) { - obj, err := c.Fake.Invokes(FakeAction{Action: "list-replicationControllers"}, &api.ReplicationControllerList{}) + obj, err := c.Fake.Invokes(FakeAction{Action: ListControllerAction}, &api.ReplicationControllerList{}) return obj.(*api.ReplicationControllerList), err } func (c *FakeReplicationControllers) Get(name string) (*api.ReplicationController, error) { - obj, err := c.Fake.Invokes(FakeAction{Action: "get-replicationController", Value: name}, &api.ReplicationController{}) + obj, err := c.Fake.Invokes(FakeAction{Action: GetControllerAction, Value: name}, &api.ReplicationController{}) return obj.(*api.ReplicationController), err } func (c *FakeReplicationControllers) Create(controller *api.ReplicationController) (*api.ReplicationController, error) { - obj, err := c.Fake.Invokes(FakeAction{Action: "create-replicationController", Value: controller}, &api.ReplicationController{}) + obj, err := c.Fake.Invokes(FakeAction{Action: CreateControllerAction, Value: controller}, &api.ReplicationController{}) return obj.(*api.ReplicationController), err } func (c *FakeReplicationControllers) Update(controller *api.ReplicationController) (*api.ReplicationController, error) { - obj, err := c.Fake.Invokes(FakeAction{Action: "update-replicationController", Value: controller}, &api.ReplicationController{}) + obj, err := c.Fake.Invokes(FakeAction{Action: UpdateControllerAction, Value: controller}, &api.ReplicationController{}) return obj.(*api.ReplicationController), err } func (c *FakeReplicationControllers) Delete(name string) error { - _, err := c.Fake.Invokes(FakeAction{Action: "delete-replicationController", Value: name}, &api.ReplicationController{}) + _, err := c.Fake.Invokes(FakeAction{Action: DeleteControllerAction, Value: name}, &api.ReplicationController{}) return err } func (c *FakeReplicationControllers) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-replicationController", Value: resourceVersion}) + c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: WatchControllerAction, Value: resourceVersion}) return c.Fake.Watch, nil } diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 5ea0c44f2a..6a675b3718 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -19,7 +19,6 @@ package controller import ( "encoding/json" "fmt" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" @@ -27,12 +26,14 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "github.com/golang/glog" "sync/atomic" ) -const CreatedByAnnotation = "kubernetes.io/created-by" +const ( + CreatedByAnnotation = "kubernetes.io/created-by" + updateRetries = 1 +) // Expectations are a way for replication controllers to tell the rc manager what they expect. eg: // RCExpectations: { @@ -276,21 +277,28 @@ func filterActivePods(pods []api.Pod) []*api.Pod { return result } -// updateReplicaCount attempts to update the Status.Replicas of the given controller, with retries. -// Note that the controller pointer might contain a more recent version of the same controller passed into the function. -func updateReplicaCount(rcClient client.ReplicationControllerInterface, controller *api.ReplicationController, numReplicas int) error { - return wait.Poll(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) { - if controller.Status.Replicas != numReplicas { - glog.V(4).Infof("Updating replica count for rc: %v, %d->%d", controller.Name, controller.Status.Replicas, numReplicas) - controller.Status.Replicas = numReplicas - _, err := rcClient.Update(controller) - if err != nil { - glog.V(2).Infof("Controller %v failed to update replica count: %v", controller.Name, err) - // Update the controller with the latest resource version for the next poll - controller, _ = rcClient.Get(controller.Name) - return false, err - } +// updateReplicaCount attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry. +func updateReplicaCount(rcClient client.ReplicationControllerInterface, controller api.ReplicationController, numReplicas int) (updateErr error) { + // This is the steady state. It happens when the rc doesn't have any expectations, since + // we do a periodic relist every 30s. + if controller.Status.Replicas == numReplicas { + return nil + } + var getErr error + glog.V(4).Infof("Updating replica count for rc: %v, %d->%d", controller.Name, controller.Status.Replicas, numReplicas) + for i, rc := 0, &controller; ; i++ { + rc.Status.Replicas = numReplicas + _, updateErr = rcClient.Update(rc) + if updateErr == nil || i >= updateRetries { + return updateErr } - return true, nil - }) + // Update the controller with the latest resource version for the next poll + if rc, getErr = rcClient.Get(controller.Name); getErr != nil { + // If the GET fails we can't trust status.Replicas anymore. This error + // is bound to be more interesting than the update failure. + return getErr + } + } + // Failed 2 updates one of which was with the latest controller, return the update error + return } diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 8b90c5de2e..d297603d1a 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -349,9 +349,12 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { rm.manageReplicas(filteredPods, &controller) } - // Always updates status as pods come up or die - if err := updateReplicaCount(rm.kubeClient.ReplicationControllers(controller.Namespace), &controller, len(filteredPods)); err != nil { - glog.V(2).Infof("Failed to update replica count for controller %v, will try on next sync", controller.Name) + // Always updates status as pods come up or die. + if err := updateReplicaCount(rm.kubeClient.ReplicationControllers(controller.Namespace), controller, len(filteredPods)); err != nil { + // Multiple things could lead to this update failing. Requeuing the controller ensures + // we retry with some fairness. + glog.V(2).Infof("Failed to update replica count for controller %v, requeuing", controller.Name) + rm.enqueueController(&controller) } return nil } diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index 49b63b51a4..ddc9d0971b 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -759,3 +759,86 @@ func TestUpdatePods(t *testing.T) { } } } + +func TestControllerUpdateRequeue(t *testing.T) { + // This server should force a requeue of the controller becuase it fails to update status.Replicas. + fakeHandler := util.FakeHandler{ + StatusCode: 500, + ResponseBody: "", + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + manager := NewReplicationManager(client) + + rc := newReplicationController(1) + manager.controllerStore.Store.Add(rc) + rc.Status = api.ReplicationControllerStatus{Replicas: 2} + newPodList(manager.podStore.Store, 1, api.PodRunning, rc) + + fakePodControl := FakePodControl{} + manager.podControl = &fakePodControl + + manager.syncReplicationController(getKey(rc, t)) + + ch := make(chan interface{}) + go func() { + item, _ := manager.queue.Get() + ch <- item + }() + select { + case key := <-ch: + expectedKey := getKey(rc, t) + if key != expectedKey { + t.Errorf("Expected requeue of controller with key %s got %s", expectedKey, key) + } + case <-time.After(100 * time.Millisecond): + manager.queue.ShutDown() + t.Errorf("Expected to find an rc in the queue, found none.") + } + // 1 Update and 1 GET, both of which fail + fakeHandler.ValidateRequestCount(t, 2) +} + +func TestControllerUpdateStatusWithFailure(t *testing.T) { + rc := newReplicationController(1) + fakeClient := &testclient.Fake{ + ReactFn: func(f testclient.FakeAction) (runtime.Object, error) { + if f.Action == testclient.GetControllerAction { + return rc, nil + } + return &api.ReplicationController{}, fmt.Errorf("Fake error") + }, + } + fakeRCClient := &testclient.FakeReplicationControllers{fakeClient, "default"} + numReplicas := 10 + updateReplicaCount(fakeRCClient, *rc, numReplicas) + updates, gets := 0, 0 + for _, a := range fakeClient.Actions { + switch a.Action { + case testclient.GetControllerAction: + gets++ + // Make sure the get is for the right rc even though the update failed. + if s, ok := a.Value.(string); !ok || s != rc.Name { + t.Errorf("Expected get for rc %v, got %+v instead", rc.Name, s) + } + case testclient.UpdateControllerAction: + updates++ + // Confirm that the update has the right status.Replicas even though the Get + // returned an rc with replicas=1. + if c, ok := a.Value.(*api.ReplicationController); !ok { + t.Errorf("Expected an rc as the argument to update, got %T", c) + } else if c.Status.Replicas != numReplicas { + t.Errorf("Expected update for rc to contain replicas %v, got %v instead", + numReplicas, c.Status.Replicas) + } + default: + t.Errorf("Unexpected action %+v", a) + break + } + } + if gets != 1 || updates != 2 { + t.Errorf("Expected 1 get and 2 updates, got %d gets %d updates", gets, updates) + } +} diff --git a/pkg/namespace/namespace_controller_test.go b/pkg/namespace/namespace_controller_test.go index 53fb4daa02..dd453de486 100644 --- a/pkg/namespace/namespace_controller_test.go +++ b/pkg/namespace/namespace_controller_test.go @@ -88,11 +88,12 @@ func TestSyncNamespaceThatIsTerminating(t *testing.T) { if err != nil { t.Errorf("Unexpected error when synching namespace %v", err) } + // TODO: Reuse the constants for all these strings from testclient expectedActionSet := util.NewStringSet( + testclient.ListControllerAction, "list-services", "list-pods", "list-resourceQuotas", - "list-replicationControllers", "list-secrets", "list-limitRanges", "list-events",