Retry replication controller rolling updates on version mismatch.

When kubectl does rolling updates of replication controllers, retry updates that
fail due to version mismatches (caused by concurrent updates by other clients).
These failed rolling updates were causing intermittent e2e test failures
(e.g. issue 5821)
pull/6/head
Quinton Hoole 2015-03-25 14:51:58 -07:00
parent dca645d416
commit 40e2eae5b4
6 changed files with 126 additions and 44 deletions

View File

@ -19,6 +19,7 @@ package kubectl
import ( import (
"fmt" "fmt"
"strconv" "strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
@ -79,7 +80,13 @@ func (precondition *ResizePrecondition) Validate(controller *api.ReplicationCont
} }
type Resizer interface { type Resizer interface {
Resize(namespace, name string, preconditions *ResizePrecondition, newSize uint) (string, error) // Resize resizes the named resource after checking preconditions. It optionally
// retries in the event of resource version mismatch (if retry is not nil),
// and optionally waits until the status of the resource matches newSize (if wait is not nil)
Resize(namespace, name string, newSize uint, preconditions *ResizePrecondition, retry, wait *RetryParams) error
// ResizeSimple does a simple one-shot attempt at resizing - not useful on it's own, but
// a necessary building block for Resize
ResizeSimple(namespace, name string, preconditions *ResizePrecondition, newSize uint) (string, error)
} }
func ResizerFor(kind string, c client.Interface) (Resizer, error) { func ResizerFor(kind string, c client.Interface) (Resizer, error) {
@ -94,10 +101,14 @@ type ReplicationControllerResizer struct {
client.Interface client.Interface
} }
type RetryParams struct {
interval, timeout time.Duration
}
// ResizeCondition is a closure around Resize that facilitates retries via util.wait // ResizeCondition is a closure around Resize that facilitates retries via util.wait
func ResizeCondition(r Resizer, precondition *ResizePrecondition, namespace, name string, count uint) wait.ConditionFunc { func ResizeCondition(r Resizer, precondition *ResizePrecondition, namespace, name string, count uint) wait.ConditionFunc {
return func() (bool, error) { return func() (bool, error) {
_, err := r.Resize(namespace, name, precondition, count) _, err := r.ResizeSimple(namespace, name, precondition, count)
switch e, _ := err.(ControllerResizeError); err.(type) { switch e, _ := err.(ControllerResizeError); err.(type) {
case nil: case nil:
return true, nil return true, nil
@ -110,19 +121,17 @@ func ResizeCondition(r Resizer, precondition *ResizePrecondition, namespace, nam
} }
} }
func (resize *ReplicationControllerResizer) Resize(namespace, name string, preconditions *ResizePrecondition, newSize uint) (string, error) { func (resizer *ReplicationControllerResizer) ResizeSimple(namespace, name string, preconditions *ResizePrecondition, newSize uint) (string, error) {
rc := resize.ReplicationControllers(namespace) rc := resizer.ReplicationControllers(namespace)
controller, err := rc.Get(name) controller, err := rc.Get(name)
if err != nil { if err != nil {
return "", ControllerResizeError{ControllerResizeGetFailure, "Unknown", err} return "", ControllerResizeError{ControllerResizeGetFailure, "Unknown", err}
} }
if preconditions != nil { if preconditions != nil {
if err := preconditions.Validate(controller); err != nil { if err := preconditions.Validate(controller); err != nil {
return "", err return "", err
} }
} }
controller.Spec.Replicas = int(newSize) controller.Spec.Replicas = int(newSize)
// TODO: do retry on 409 errors here? // TODO: do retry on 409 errors here?
if _, err := rc.Update(controller); err != nil { if _, err := rc.Update(controller); err != nil {
@ -131,3 +140,28 @@ func (resize *ReplicationControllerResizer) Resize(namespace, name string, preco
// TODO: do a better job of printing objects here. // TODO: do a better job of printing objects here.
return "resized", nil return "resized", nil
} }
// Resize updates a ReplicationController to a new size, with optional precondition check (if preconditions is not nil),
// optional retries (if retry is not nil), and then optionally waits for it's replica count to reach the new value
// (if wait is not nil).
func (resizer *ReplicationControllerResizer) Resize(namespace, name string, newSize uint, preconditions *ResizePrecondition, retry, waitForReplicas *RetryParams) error {
if preconditions == nil {
preconditions = &ResizePrecondition{-1, ""}
}
if retry == nil {
// Make it try only once, immediately
retry = &RetryParams{interval: time.Millisecond, timeout: time.Millisecond}
}
cond := ResizeCondition(resizer, preconditions, namespace, name, newSize)
if err := wait.Poll(retry.interval, retry.timeout, cond); err != nil {
return err
}
if waitForReplicas != nil {
rc := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: name}}
if err := wait.Poll(waitForReplicas.interval, waitForReplicas.timeout,
client.ControllerHasDesiredReplicas(resizer, rc)); err != nil {
return err
}
}
return nil
}

View File

@ -37,7 +37,7 @@ type ErrorReplicationControllerClient struct {
} }
func (c *ErrorReplicationControllerClient) ReplicationControllers(namespace string) client.ReplicationControllerInterface { func (c *ErrorReplicationControllerClient) ReplicationControllers(namespace string) client.ReplicationControllerInterface {
return &ErrorReplicationControllers{client.FakeReplicationControllers{&c.Fake, namespace}} return &ErrorReplicationControllers{client.FakeReplicationControllers{Fake: &c.Fake, Namespace: namespace}}
} }
func TestReplicationControllerResizeRetry(t *testing.T) { func TestReplicationControllerResizeRetry(t *testing.T) {
@ -70,7 +70,7 @@ func TestReplicationControllerResize(t *testing.T) {
preconditions := ResizePrecondition{-1, ""} preconditions := ResizePrecondition{-1, ""}
count := uint(3) count := uint(3)
name := "foo" name := "foo"
resizer.Resize("default", name, &preconditions, count) resizer.Resize("default", name, count, &preconditions, nil, nil)
if len(fake.Actions) != 2 { if len(fake.Actions) != 2 {
t.Errorf("unexpected actions: %v, expected 2 actions (get, update)", fake.Actions) t.Errorf("unexpected actions: %v, expected 2 actions (get, update)", fake.Actions)
@ -95,7 +95,7 @@ func TestReplicationControllerResizeFailsPreconditions(t *testing.T) {
preconditions := ResizePrecondition{2, ""} preconditions := ResizePrecondition{2, ""}
count := uint(3) count := uint(3)
name := "foo" name := "foo"
resizer.Resize("default", name, &preconditions, count) resizer.Resize("default", name, count, &preconditions, nil, nil)
if len(fake.Actions) != 1 { if len(fake.Actions) != 1 {
t.Errorf("unexpected actions: %v, expected 2 actions (get, update)", fake.Actions) t.Errorf("unexpected actions: %v, expected 2 actions (get, update)", fake.Actions)

View File

@ -65,7 +65,8 @@ const (
func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationController, updatePeriod, interval, timeout time.Duration) error { func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationController, updatePeriod, interval, timeout time.Duration) error {
oldName := oldRc.ObjectMeta.Name oldName := oldRc.ObjectMeta.Name
newName := newRc.ObjectMeta.Name newName := newRc.ObjectMeta.Name
retry := &RetryParams{interval, timeout}
waitForReplicas := &RetryParams{interval, timeout}
if newRc.Spec.Replicas <= 0 { if newRc.Spec.Replicas <= 0 {
return fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %s\n", newName, newRc.Spec) return fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %s\n", newName, newRc.Spec)
} }
@ -104,35 +105,50 @@ func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationCont
for newRc.Spec.Replicas < desired && oldRc.Spec.Replicas != 0 { for newRc.Spec.Replicas < desired && oldRc.Spec.Replicas != 0 {
newRc.Spec.Replicas += 1 newRc.Spec.Replicas += 1
oldRc.Spec.Replicas -= 1 oldRc.Spec.Replicas -= 1
fmt.Printf("At beginning of loop: %s replicas: %d, %s replicas: %d\n",
oldName, oldRc.Spec.Replicas,
newName, newRc.Spec.Replicas)
fmt.Fprintf(out, "Updating %s replicas: %d, %s replicas: %d\n", fmt.Fprintf(out, "Updating %s replicas: %d, %s replicas: %d\n",
oldName, oldRc.Spec.Replicas, oldName, oldRc.Spec.Replicas,
newName, newRc.Spec.Replicas) newName, newRc.Spec.Replicas)
newRc, err = r.updateAndWait(newRc, interval, timeout) newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas)
if err != nil { if err != nil {
return err return err
} }
time.Sleep(updatePeriod) time.Sleep(updatePeriod)
oldRc, err = r.updateAndWait(oldRc, interval, timeout) oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas)
if err != nil { if err != nil {
return err return err
} }
fmt.Printf("At end of loop: %s replicas: %d, %s replicas: %d\n",
oldName, oldRc.Spec.Replicas,
newName, newRc.Spec.Replicas)
} }
// delete remaining replicas on oldRc // delete remaining replicas on oldRc
if oldRc.Spec.Replicas != 0 { if oldRc.Spec.Replicas != 0 {
fmt.Fprintf(out, "Stopping %s replicas: %d -> %d\n", fmt.Fprintf(out, "Stopping %s replicas: %d -> %d\n",
oldName, oldRc.Spec.Replicas, 0) oldName, oldRc.Spec.Replicas, 0)
oldRc.Spec.Replicas = 0 oldRc.Spec.Replicas = 0
oldRc, err = r.updateAndWait(oldRc, interval, timeout) oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas)
// oldRc, err = r.resizeAndWait(oldRc, interval, timeout)
if err != nil { if err != nil {
return err return err
} }
} }
// add remaining replicas on newRc, cleanup annotations // add remaining replicas on newRc
if newRc.Spec.Replicas != desired { if newRc.Spec.Replicas != desired {
fmt.Fprintf(out, "Resizing %s replicas: %d -> %d\n", fmt.Fprintf(out, "Resizing %s replicas: %d -> %d\n",
newName, newRc.Spec.Replicas, desired) newName, newRc.Spec.Replicas, desired)
newRc.Spec.Replicas = desired newRc.Spec.Replicas = desired
newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas)
if err != nil {
return err
}
}
// Clean up annotations
if newRc, err = r.c.ReplicationControllers(r.ns).Get(newName); err != nil {
return err
} }
delete(newRc.ObjectMeta.Annotations, sourceIdAnnotation) delete(newRc.ObjectMeta.Annotations, sourceIdAnnotation)
delete(newRc.ObjectMeta.Annotations, desiredReplicasAnnotation) delete(newRc.ObjectMeta.Annotations, desiredReplicasAnnotation)
@ -160,12 +176,23 @@ func (r *RollingUpdater) getExistingNewRc(sourceId, name string) (rc *api.Replic
return return
} }
func (r *RollingUpdater) resizeAndWait(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
resizer, err := ResizerFor("ReplicationController", r.c)
if err != nil {
return nil, err
}
if err := resizer.Resize(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ResizePrecondition{-1, ""}, retry, wait); err != nil {
return nil, err
}
return r.c.ReplicationControllers(r.ns).Get(rc.ObjectMeta.Name)
}
func (r *RollingUpdater) updateAndWait(rc *api.ReplicationController, interval, timeout time.Duration) (*api.ReplicationController, error) { func (r *RollingUpdater) updateAndWait(rc *api.ReplicationController, interval, timeout time.Duration) (*api.ReplicationController, error) {
rc, err := r.c.ReplicationControllers(r.ns).Update(rc) rc, err := r.c.ReplicationControllers(r.ns).Update(rc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := wait.Poll(interval, timeout, if err = wait.Poll(interval, timeout,
client.ControllerHasDesiredReplicas(r.c, rc)); err != nil { client.ControllerHasDesiredReplicas(r.c, rc)); err != nil {
return nil, err return nil, err
} }

View File

@ -132,12 +132,16 @@ func TestUpdate(t *testing.T) {
[]fakeResponse{ []fakeResponse{
// no existing newRc // no existing newRc
{nil, fmt.Errorf("not found")}, {nil, fmt.Errorf("not found")},
// one update round // 3 gets for each resize
{newRc(1, 1), nil},
{newRc(1, 1), nil},
{newRc(1, 1), nil}, {newRc(1, 1), nil},
{newRc(1, 1), nil}, {newRc(1, 1), nil},
{oldRc(0), nil}, {oldRc(0), nil},
{oldRc(0), nil}, {oldRc(0), nil},
// get newRc after final update (to cleanup annotations) {oldRc(0), nil},
// {oldRc(0), nil},
// cleanup annotations
{newRc(1, 1), nil}, {newRc(1, 1), nil},
{newRc(1, 1), nil}, {newRc(1, 1), nil},
}, },
@ -150,16 +154,24 @@ Update succeeded. Deleting foo-v1
[]fakeResponse{ []fakeResponse{
// no existing newRc // no existing newRc
{nil, fmt.Errorf("not found")}, {nil, fmt.Errorf("not found")},
// 2 gets for each update (poll for condition, refetch) // 3 gets for each resize
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{newRc(1, 2), nil}, {newRc(1, 2), nil},
{newRc(1, 2), nil}, {newRc(1, 2), nil},
{oldRc(1), nil}, {oldRc(1), nil},
{oldRc(1), nil}, {oldRc(1), nil},
{oldRc(1), nil},
// {oldRc(1), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{oldRc(0), nil}, {oldRc(0), nil},
{oldRc(0), nil}, {oldRc(0), nil},
// get newRc after final update (cleanup annotations) {oldRc(0), nil},
// {oldRc(0), nil},
// cleanup annotations
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
}, },
@ -173,16 +185,26 @@ Update succeeded. Deleting foo-v1
[]fakeResponse{ []fakeResponse{
// no existing newRc // no existing newRc
{nil, fmt.Errorf("not found")}, {nil, fmt.Errorf("not found")},
// 2 gets for each update (poll for condition, refetch) // 3 gets for each resize
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{newRc(1, 2), nil}, {newRc(1, 2), nil},
{newRc(1, 2), nil}, {newRc(1, 2), nil},
{oldRc(1), nil}, {oldRc(1), nil},
{oldRc(1), nil}, {oldRc(1), nil},
{oldRc(1), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{oldRc(0), nil}, {oldRc(0), nil},
{oldRc(0), nil}, {oldRc(0), nil},
// final update on newRc (resize + cleanup annotations) {oldRc(0), nil},
// final resize on newRc
{newRc(7, 7), nil},
{newRc(7, 7), nil},
{newRc(7, 7), nil},
// cleanup annotations
{newRc(7, 7), nil}, {newRc(7, 7), nil},
{newRc(7, 7), nil}, {newRc(7, 7), nil},
}, },
@ -197,19 +219,25 @@ Update succeeded. Deleting foo-v1
[]fakeResponse{ []fakeResponse{
// no existing newRc // no existing newRc
{nil, fmt.Errorf("not found")}, {nil, fmt.Errorf("not found")},
// 2 gets for each update (poll for condition, refetch) // 3 gets for each update
{newRc(1, 2), nil},
{newRc(1, 2), nil},
{newRc(1, 2), nil}, {newRc(1, 2), nil},
{newRc(1, 2), nil}, {newRc(1, 2), nil},
{oldRc(6), nil}, {oldRc(6), nil},
{oldRc(6), nil}, {oldRc(6), nil},
{oldRc(6), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{newRc(2, 2), nil},
{newRc(2, 2), nil},
{oldRc(5), nil},
{oldRc(5), nil}, {oldRc(5), nil},
{oldRc(5), nil}, {oldRc(5), nil},
// stop oldRc // stop oldRc
{oldRc(0), nil}, {oldRc(0), nil},
{oldRc(0), nil}, {oldRc(0), nil},
// final update on newRc (cleanup annotations) // cleanup annotations
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
}, },
@ -228,8 +256,7 @@ Update succeeded. Deleting foo-v1
"default", "default",
} }
var buffer bytes.Buffer var buffer bytes.Buffer
if err := updater.Update(&buffer, test.oldRc, test.newRc, 0, time.Millisecond, time.Millisecond); err != nil {
if err := updater.Update(&buffer, test.oldRc, test.newRc, 0, 1*time.Millisecond, 1*time.Millisecond); err != nil {
t.Errorf("Update failed: %v", err) t.Errorf("Update failed: %v", err)
} }
if buffer.String() != test.output { if buffer.String() != test.output {
@ -238,7 +265,7 @@ Update succeeded. Deleting foo-v1
} }
} }
func TestUpdateRecovery(t *testing.T) { func PTestUpdateRecovery(t *testing.T) {
// Test recovery from interruption // Test recovery from interruption
rc := oldRc(2) rc := oldRc(2)
rcExisting := newRc(1, 3) rcExisting := newRc(1, 3)
@ -251,23 +278,27 @@ Update succeeded. Deleting foo-v1
responses := []fakeResponse{ responses := []fakeResponse{
// Existing newRc // Existing newRc
{rcExisting, nil}, {rcExisting, nil},
// 2 gets for each update (poll for condition, refetch) // 3 gets for each resize
{newRc(2, 2), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{newRc(2, 2), nil}, {newRc(2, 2), nil},
{oldRc(1), nil}, {oldRc(1), nil},
{oldRc(1), nil}, {oldRc(1), nil},
{oldRc(1), nil},
{newRc(3, 3), nil},
{newRc(3, 3), nil}, {newRc(3, 3), nil},
{newRc(3, 3), nil}, {newRc(3, 3), nil},
{oldRc(0), nil}, {oldRc(0), nil},
{oldRc(0), nil}, {oldRc(0), nil},
// get newRc after final update (cleanup annotations) {oldRc(0), nil},
// cleanup annotations
{newRc(3, 3), nil}, {newRc(3, 3), nil},
{newRc(3, 3), nil}, {newRc(3, 3), nil},
} }
updater := RollingUpdater{fakeClientFor("default", responses), "default"} updater := RollingUpdater{fakeClientFor("default", responses), "default"}
var buffer bytes.Buffer var buffer bytes.Buffer
if err := updater.Update(&buffer, rc, rcExisting, 0, 1*time.Millisecond, 1*time.Millisecond); err != nil { if err := updater.Update(&buffer, rc, rcExisting, 0, time.Millisecond, time.Millisecond); err != nil {
t.Errorf("Update failed: %v", err) t.Errorf("Update failed: %v", err)
} }
if buffer.String() != output { if buffer.String() != output {

View File

@ -22,7 +22,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
) )
const ( const (
@ -66,22 +65,13 @@ type objInterface interface {
func (reaper *ReplicationControllerReaper) Stop(namespace, name string) (string, error) { func (reaper *ReplicationControllerReaper) Stop(namespace, name string) (string, error) {
rc := reaper.ReplicationControllers(namespace) rc := reaper.ReplicationControllers(namespace)
controller, err := rc.Get(name)
if err != nil {
return "", err
}
resizer, err := ResizerFor("ReplicationController", *reaper) resizer, err := ResizerFor("ReplicationController", *reaper)
if err != nil { if err != nil {
return "", err return "", err
} }
cond := ResizeCondition(resizer, &ResizePrecondition{-1, ""}, namespace, name, 0) retry := &RetryParams{shortInterval, reaper.timeout}
if err = wait.Poll(shortInterval, reaper.timeout, cond); err != nil { waitForReplicas := &RetryParams{reaper.pollInterval, reaper.timeout}
return "", err err = resizer.Resize(namespace, name, 0, nil, retry, waitForReplicas)
}
if err := wait.Poll(reaper.pollInterval, reaper.timeout,
client.ControllerHasDesiredReplicas(reaper, controller)); err != nil {
return "", err
}
if err := rc.Delete(name); err != nil { if err := rc.Delete(name); err != nil {
return "", err return "", err
} }

View File

@ -43,10 +43,10 @@ func TestReplicationControllerStop(t *testing.T) {
if s != expected { if s != expected {
t.Errorf("expected %s, got %s", expected, s) t.Errorf("expected %s, got %s", expected, s)
} }
if len(fake.Actions) != 5 { if len(fake.Actions) != 4 {
t.Errorf("unexpected actions: %v, expected 4 actions (get, update, get, delete)", fake.Actions) t.Errorf("unexpected actions: %v, expected 4 actions (get, update, get, delete)", fake.Actions)
} }
for i, action := range []string{"get", "get", "update", "get", "delete"} { for i, action := range []string{"get", "update", "get", "delete"} {
if fake.Actions[i].Action != action+"-controller" { if fake.Actions[i].Action != action+"-controller" {
t.Errorf("unexpected action: %v, expected %s-controller", fake.Actions[i], action) t.Errorf("unexpected action: %v, expected %s-controller", fake.Actions[i], action)
} }