mirror of https://github.com/k3s-io/k3s
Retry once and requeue on failure to update status.Replicas
parent
ee473edd45
commit
864e12d995
|
@ -30,32 +30,41 @@ type FakeReplicationControllers struct {
|
|||
Namespace string
|
||||
}
|
||||
|
||||
const (
|
||||
GetControllerAction = "get-replicationController"
|
||||
UpdateControllerAction = "update-replicationController"
|
||||
WatchControllerAction = "watch-replicationController"
|
||||
DeleteControllerAction = "delete-replicationController"
|
||||
ListControllerAction = "list-replicationControllers"
|
||||
CreateControllerAction = "create-replicationController"
|
||||
)
|
||||
|
||||
func (c *FakeReplicationControllers) List(selector labels.Selector) (*api.ReplicationControllerList, error) {
|
||||
obj, err := c.Fake.Invokes(FakeAction{Action: "list-replicationControllers"}, &api.ReplicationControllerList{})
|
||||
obj, err := c.Fake.Invokes(FakeAction{Action: ListControllerAction}, &api.ReplicationControllerList{})
|
||||
return obj.(*api.ReplicationControllerList), err
|
||||
}
|
||||
|
||||
func (c *FakeReplicationControllers) Get(name string) (*api.ReplicationController, error) {
|
||||
obj, err := c.Fake.Invokes(FakeAction{Action: "get-replicationController", Value: name}, &api.ReplicationController{})
|
||||
obj, err := c.Fake.Invokes(FakeAction{Action: GetControllerAction, Value: name}, &api.ReplicationController{})
|
||||
return obj.(*api.ReplicationController), err
|
||||
}
|
||||
|
||||
func (c *FakeReplicationControllers) Create(controller *api.ReplicationController) (*api.ReplicationController, error) {
|
||||
obj, err := c.Fake.Invokes(FakeAction{Action: "create-replicationController", Value: controller}, &api.ReplicationController{})
|
||||
obj, err := c.Fake.Invokes(FakeAction{Action: CreateControllerAction, Value: controller}, &api.ReplicationController{})
|
||||
return obj.(*api.ReplicationController), err
|
||||
}
|
||||
|
||||
func (c *FakeReplicationControllers) Update(controller *api.ReplicationController) (*api.ReplicationController, error) {
|
||||
obj, err := c.Fake.Invokes(FakeAction{Action: "update-replicationController", Value: controller}, &api.ReplicationController{})
|
||||
obj, err := c.Fake.Invokes(FakeAction{Action: UpdateControllerAction, Value: controller}, &api.ReplicationController{})
|
||||
return obj.(*api.ReplicationController), err
|
||||
}
|
||||
|
||||
func (c *FakeReplicationControllers) Delete(name string) error {
|
||||
_, err := c.Fake.Invokes(FakeAction{Action: "delete-replicationController", Value: name}, &api.ReplicationController{})
|
||||
_, err := c.Fake.Invokes(FakeAction{Action: DeleteControllerAction, Value: name}, &api.ReplicationController{})
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *FakeReplicationControllers) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
|
||||
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-replicationController", Value: resourceVersion})
|
||||
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: WatchControllerAction, Value: resourceVersion})
|
||||
return c.Fake.Watch, nil
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package controller
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
|
||||
|
@ -27,12 +26,14 @@ import (
|
|||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
|
||||
"github.com/golang/glog"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
const CreatedByAnnotation = "kubernetes.io/created-by"
|
||||
const (
|
||||
CreatedByAnnotation = "kubernetes.io/created-by"
|
||||
updateRetries = 1
|
||||
)
|
||||
|
||||
// Expectations are a way for replication controllers to tell the rc manager what they expect. eg:
|
||||
// RCExpectations: {
|
||||
|
@ -276,21 +277,28 @@ func filterActivePods(pods []api.Pod) []*api.Pod {
|
|||
return result
|
||||
}
|
||||
|
||||
// updateReplicaCount attempts to update the Status.Replicas of the given controller, with retries.
|
||||
// Note that the controller pointer might contain a more recent version of the same controller passed into the function.
|
||||
func updateReplicaCount(rcClient client.ReplicationControllerInterface, controller *api.ReplicationController, numReplicas int) error {
|
||||
return wait.Poll(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) {
|
||||
if controller.Status.Replicas != numReplicas {
|
||||
glog.V(4).Infof("Updating replica count for rc: %v, %d->%d", controller.Name, controller.Status.Replicas, numReplicas)
|
||||
controller.Status.Replicas = numReplicas
|
||||
_, err := rcClient.Update(controller)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("Controller %v failed to update replica count: %v", controller.Name, err)
|
||||
// Update the controller with the latest resource version for the next poll
|
||||
controller, _ = rcClient.Get(controller.Name)
|
||||
return false, err
|
||||
}
|
||||
// updateReplicaCount attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry.
|
||||
func updateReplicaCount(rcClient client.ReplicationControllerInterface, controller api.ReplicationController, numReplicas int) (updateErr error) {
|
||||
// This is the steady state. It happens when the rc doesn't have any expectations, since
|
||||
// we do a periodic relist every 30s.
|
||||
if controller.Status.Replicas == numReplicas {
|
||||
return nil
|
||||
}
|
||||
var getErr error
|
||||
glog.V(4).Infof("Updating replica count for rc: %v, %d->%d", controller.Name, controller.Status.Replicas, numReplicas)
|
||||
for i, rc := 0, &controller; ; i++ {
|
||||
rc.Status.Replicas = numReplicas
|
||||
_, updateErr = rcClient.Update(rc)
|
||||
if updateErr == nil || i >= updateRetries {
|
||||
return updateErr
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
// Update the controller with the latest resource version for the next poll
|
||||
if rc, getErr = rcClient.Get(controller.Name); getErr != nil {
|
||||
// If the GET fails we can't trust status.Replicas anymore. This error
|
||||
// is bound to be more interesting than the update failure.
|
||||
return getErr
|
||||
}
|
||||
}
|
||||
// Failed 2 updates one of which was with the latest controller, return the update error
|
||||
return
|
||||
}
|
||||
|
|
|
@ -349,9 +349,12 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
|
|||
rm.manageReplicas(filteredPods, &controller)
|
||||
}
|
||||
|
||||
// Always updates status as pods come up or die
|
||||
if err := updateReplicaCount(rm.kubeClient.ReplicationControllers(controller.Namespace), &controller, len(filteredPods)); err != nil {
|
||||
glog.V(2).Infof("Failed to update replica count for controller %v, will try on next sync", controller.Name)
|
||||
// Always updates status as pods come up or die.
|
||||
if err := updateReplicaCount(rm.kubeClient.ReplicationControllers(controller.Namespace), controller, len(filteredPods)); err != nil {
|
||||
// Multiple things could lead to this update failing. Requeuing the controller ensures
|
||||
// we retry with some fairness.
|
||||
glog.V(2).Infof("Failed to update replica count for controller %v, requeuing", controller.Name)
|
||||
rm.enqueueController(&controller)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -759,3 +759,86 @@ func TestUpdatePods(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestControllerUpdateRequeue(t *testing.T) {
|
||||
// This server should force a requeue of the controller becuase it fails to update status.Replicas.
|
||||
fakeHandler := util.FakeHandler{
|
||||
StatusCode: 500,
|
||||
ResponseBody: "",
|
||||
}
|
||||
testServer := httptest.NewServer(&fakeHandler)
|
||||
defer testServer.Close()
|
||||
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
manager := NewReplicationManager(client)
|
||||
|
||||
rc := newReplicationController(1)
|
||||
manager.controllerStore.Store.Add(rc)
|
||||
rc.Status = api.ReplicationControllerStatus{Replicas: 2}
|
||||
newPodList(manager.podStore.Store, 1, api.PodRunning, rc)
|
||||
|
||||
fakePodControl := FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
|
||||
manager.syncReplicationController(getKey(rc, t))
|
||||
|
||||
ch := make(chan interface{})
|
||||
go func() {
|
||||
item, _ := manager.queue.Get()
|
||||
ch <- item
|
||||
}()
|
||||
select {
|
||||
case key := <-ch:
|
||||
expectedKey := getKey(rc, t)
|
||||
if key != expectedKey {
|
||||
t.Errorf("Expected requeue of controller with key %s got %s", expectedKey, key)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
manager.queue.ShutDown()
|
||||
t.Errorf("Expected to find an rc in the queue, found none.")
|
||||
}
|
||||
// 1 Update and 1 GET, both of which fail
|
||||
fakeHandler.ValidateRequestCount(t, 2)
|
||||
}
|
||||
|
||||
func TestControllerUpdateStatusWithFailure(t *testing.T) {
|
||||
rc := newReplicationController(1)
|
||||
fakeClient := &testclient.Fake{
|
||||
ReactFn: func(f testclient.FakeAction) (runtime.Object, error) {
|
||||
if f.Action == testclient.GetControllerAction {
|
||||
return rc, nil
|
||||
}
|
||||
return &api.ReplicationController{}, fmt.Errorf("Fake error")
|
||||
},
|
||||
}
|
||||
fakeRCClient := &testclient.FakeReplicationControllers{fakeClient, "default"}
|
||||
numReplicas := 10
|
||||
updateReplicaCount(fakeRCClient, *rc, numReplicas)
|
||||
updates, gets := 0, 0
|
||||
for _, a := range fakeClient.Actions {
|
||||
switch a.Action {
|
||||
case testclient.GetControllerAction:
|
||||
gets++
|
||||
// Make sure the get is for the right rc even though the update failed.
|
||||
if s, ok := a.Value.(string); !ok || s != rc.Name {
|
||||
t.Errorf("Expected get for rc %v, got %+v instead", rc.Name, s)
|
||||
}
|
||||
case testclient.UpdateControllerAction:
|
||||
updates++
|
||||
// Confirm that the update has the right status.Replicas even though the Get
|
||||
// returned an rc with replicas=1.
|
||||
if c, ok := a.Value.(*api.ReplicationController); !ok {
|
||||
t.Errorf("Expected an rc as the argument to update, got %T", c)
|
||||
} else if c.Status.Replicas != numReplicas {
|
||||
t.Errorf("Expected update for rc to contain replicas %v, got %v instead",
|
||||
numReplicas, c.Status.Replicas)
|
||||
}
|
||||
default:
|
||||
t.Errorf("Unexpected action %+v", a)
|
||||
break
|
||||
}
|
||||
}
|
||||
if gets != 1 || updates != 2 {
|
||||
t.Errorf("Expected 1 get and 2 updates, got %d gets %d updates", gets, updates)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,11 +88,12 @@ func TestSyncNamespaceThatIsTerminating(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("Unexpected error when synching namespace %v", err)
|
||||
}
|
||||
// TODO: Reuse the constants for all these strings from testclient
|
||||
expectedActionSet := util.NewStringSet(
|
||||
testclient.ListControllerAction,
|
||||
"list-services",
|
||||
"list-pods",
|
||||
"list-resourceQuotas",
|
||||
"list-replicationControllers",
|
||||
"list-secrets",
|
||||
"list-limitRanges",
|
||||
"list-events",
|
||||
|
|
Loading…
Reference in New Issue