mirror of https://github.com/k3s-io/k3s
Retry Pod/RC updates in kubectl rolling-update
parent
62ce66988c
commit
ee81e5ebfa
|
@ -18,7 +18,6 @@ package unversioned
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
|
@ -29,61 +28,6 @@ import (
|
|||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// DefaultRetry is the recommended retry for a conflict where multiple clients
|
||||
// are making changes to the same resource.
|
||||
var DefaultRetry = wait.Backoff{
|
||||
Steps: 5,
|
||||
Duration: 10 * time.Millisecond,
|
||||
Factor: 1.0,
|
||||
Jitter: 0.1,
|
||||
}
|
||||
|
||||
// DefaultBackoff is the recommended backoff for a conflict where a client
|
||||
// may be attempting to make an unrelated modification to a resource under
|
||||
// active management by one or more controllers.
|
||||
var DefaultBackoff = wait.Backoff{
|
||||
Steps: 4,
|
||||
Duration: 10 * time.Millisecond,
|
||||
Factor: 5.0,
|
||||
Jitter: 0.1,
|
||||
}
|
||||
|
||||
// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting
|
||||
// write. Callers should preserve previous executions if they wish to retry changes. It performs an
|
||||
// exponential backoff.
|
||||
//
|
||||
// var pod *api.Pod
|
||||
// err := RetryOnConflict(DefaultBackoff, func() (err error) {
|
||||
// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus)
|
||||
// return
|
||||
// })
|
||||
// if err != nil {
|
||||
// // may be conflict if max retries were hit
|
||||
// return err
|
||||
// }
|
||||
// ...
|
||||
//
|
||||
// TODO: Make Backoff an interface?
|
||||
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
|
||||
var lastConflictErr error
|
||||
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||
err := fn()
|
||||
switch {
|
||||
case err == nil:
|
||||
return true, nil
|
||||
case errors.IsConflict(err):
|
||||
lastConflictErr = err
|
||||
return false, nil
|
||||
default:
|
||||
return false, err
|
||||
}
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = lastConflictErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ControllerHasDesiredReplicas returns a condition that will be true if and only if
|
||||
// the desired replica count for a controller's ReplicaSelector equals the Replicas count.
|
||||
func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationController) wait.ConditionFunc {
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package unversioned
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
// DefaultRetry is the recommended retry for a conflict where multiple clients
|
||||
// are making changes to the same resource.
|
||||
var DefaultRetry = wait.Backoff{
|
||||
Steps: 5,
|
||||
Duration: 10 * time.Millisecond,
|
||||
Factor: 1.0,
|
||||
Jitter: 0.1,
|
||||
}
|
||||
|
||||
// DefaultBackoff is the recommended backoff for a conflict where a client
|
||||
// may be attempting to make an unrelated modification to a resource under
|
||||
// active management by one or more controllers.
|
||||
var DefaultBackoff = wait.Backoff{
|
||||
Steps: 4,
|
||||
Duration: 10 * time.Millisecond,
|
||||
Factor: 5.0,
|
||||
Jitter: 0.1,
|
||||
}
|
||||
|
||||
// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting
|
||||
// write. Callers should preserve previous executions if they wish to retry changes. It performs an
|
||||
// exponential backoff.
|
||||
//
|
||||
// var pod *api.Pod
|
||||
// err := RetryOnConflict(DefaultBackoff, func() (err error) {
|
||||
// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus)
|
||||
// return
|
||||
// })
|
||||
// if err != nil {
|
||||
// // may be conflict if max retries were hit
|
||||
// return err
|
||||
// }
|
||||
// ...
|
||||
//
|
||||
// TODO: Make Backoff an interface?
|
||||
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
|
||||
var lastConflictErr error
|
||||
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||
err := fn()
|
||||
switch {
|
||||
case err == nil:
|
||||
return true, nil
|
||||
case errors.IsConflict(err):
|
||||
lastConflictErr = err
|
||||
return false, nil
|
||||
default:
|
||||
return false, err
|
||||
}
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = lastConflictErr
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
|
@ -187,15 +187,16 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if existing.Annotations == nil {
|
||||
existing.Annotations = map[string]string{}
|
||||
originReplicas := strconv.Itoa(int(existing.Spec.Replicas))
|
||||
applyUpdate := func(rc *api.ReplicationController) {
|
||||
if rc.Annotations == nil {
|
||||
rc.Annotations = map[string]string{}
|
||||
}
|
||||
rc.Annotations[originalReplicasAnnotation] = originReplicas
|
||||
}
|
||||
existing.Annotations[originalReplicasAnnotation] = strconv.Itoa(int(existing.Spec.Replicas))
|
||||
updated, err := r.c.ReplicationControllers(existing.Namespace).Update(existing)
|
||||
if err != nil {
|
||||
if oldRc, err = updateRcWithRetries(r.c, existing.Namespace, existing, applyUpdate); err != nil {
|
||||
return err
|
||||
}
|
||||
oldRc = updated
|
||||
}
|
||||
// maxSurge is the maximum scaling increment and maxUnavailable are the maximum pods
|
||||
// that can be unavailable during a rollout.
|
||||
|
@ -482,13 +483,14 @@ func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationControl
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
delete(newRc.Annotations, sourceIdAnnotation)
|
||||
delete(newRc.Annotations, desiredReplicasAnnotation)
|
||||
|
||||
newRc, err = r.c.ReplicationControllers(r.ns).Update(newRc)
|
||||
if err != nil {
|
||||
applyUpdate := func(rc *api.ReplicationController) {
|
||||
delete(rc.Annotations, sourceIdAnnotation)
|
||||
delete(rc.Annotations, desiredReplicasAnnotation)
|
||||
}
|
||||
if newRc, err = updateRcWithRetries(r.c, r.ns, newRc, applyUpdate); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = wait.Poll(config.Interval, config.Timeout, client.ControllerHasDesiredReplicas(r.c, newRc)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -643,27 +645,29 @@ func SetNextControllerAnnotation(rc *api.ReplicationController, name string) {
|
|||
}
|
||||
|
||||
func UpdateExistingReplicationController(c client.Interface, oldRc *api.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*api.ReplicationController, error) {
|
||||
SetNextControllerAnnotation(oldRc, newName)
|
||||
if _, found := oldRc.Spec.Selector[deploymentKey]; !found {
|
||||
SetNextControllerAnnotation(oldRc, newName)
|
||||
return AddDeploymentKeyToReplicationController(oldRc, c, deploymentKey, deploymentValue, namespace, out)
|
||||
} else {
|
||||
// If we didn't need to update the controller for the deployment key, we still need to write
|
||||
// the "next" controller.
|
||||
return c.ReplicationControllers(namespace).Update(oldRc)
|
||||
applyUpdate := func(rc *api.ReplicationController) {
|
||||
SetNextControllerAnnotation(rc, newName)
|
||||
}
|
||||
return updateRcWithRetries(c, namespace, oldRc, applyUpdate)
|
||||
}
|
||||
}
|
||||
|
||||
const MaxRetries = 3
|
||||
|
||||
func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) {
|
||||
var err error
|
||||
// First, update the template label. This ensures that any newly created pods will have the new label
|
||||
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
|
||||
applyUpdate := func(rc *api.ReplicationController) {
|
||||
if rc.Spec.Template.Labels == nil {
|
||||
rc.Spec.Template.Labels = map[string]string{}
|
||||
}
|
||||
rc.Spec.Template.Labels[deploymentKey] = deploymentValue
|
||||
}); err != nil {
|
||||
}
|
||||
if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -677,26 +681,16 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
|
|||
}
|
||||
for ix := range podList.Items {
|
||||
pod := &podList.Items[ix]
|
||||
if pod.Labels == nil {
|
||||
pod.Labels = map[string]string{
|
||||
deploymentKey: deploymentValue,
|
||||
}
|
||||
} else {
|
||||
pod.Labels[deploymentKey] = deploymentValue
|
||||
}
|
||||
err = nil
|
||||
delay := 3
|
||||
for i := 0; i < MaxRetries; i++ {
|
||||
_, err = client.Pods(namespace).Update(pod)
|
||||
if err != nil {
|
||||
fmt.Fprintf(out, "Error updating pod (%v), retrying after %d seconds", err, delay)
|
||||
time.Sleep(time.Second * time.Duration(delay))
|
||||
delay *= delay
|
||||
applyUpdate := func(p *api.Pod) {
|
||||
if p.Labels == nil {
|
||||
p.Labels = map[string]string{
|
||||
deploymentKey: deploymentValue,
|
||||
}
|
||||
} else {
|
||||
break
|
||||
p.Labels[deploymentKey] = deploymentValue
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if pod, err = updatePodWithRetries(client, namespace, pod, applyUpdate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
@ -709,12 +703,11 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
|
|||
for k, v := range oldRc.Spec.Selector {
|
||||
selectorCopy[k] = v
|
||||
}
|
||||
oldRc.Spec.Selector[deploymentKey] = deploymentValue
|
||||
|
||||
// Update the selector of the rc so it manages all the pods we updated above
|
||||
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
|
||||
applyUpdate = func(rc *api.ReplicationController) {
|
||||
rc.Spec.Selector[deploymentKey] = deploymentValue
|
||||
}); err != nil {
|
||||
}
|
||||
// Update the selector of the rc so it manages all the pods we updated above
|
||||
if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -736,33 +729,72 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
|
|||
return oldRc, nil
|
||||
}
|
||||
|
||||
type updateFunc func(controller *api.ReplicationController)
|
||||
type updateRcFunc func(controller *api.ReplicationController)
|
||||
|
||||
// updateWithRetries updates applies the given rc as an update.
|
||||
func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.ReplicationController, applyUpdate updateFunc) (*api.ReplicationController, error) {
|
||||
var err error
|
||||
oldRc := rc
|
||||
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
|
||||
// updateRcWithRetries retries updating the given rc on conflict with the following steps:
|
||||
// 1. Get latest resource
|
||||
// 2. applyUpdate
|
||||
// 3. Update the resource
|
||||
func updateRcWithRetries(c client.Interface, namespace string, rc *api.ReplicationController, applyUpdate updateRcFunc) (*api.ReplicationController, error) {
|
||||
// Deep copy the rc in case we failed on Get during retry loop
|
||||
obj, err := api.Scheme.Copy(rc)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to deep copy rc before updating it: %v", err)
|
||||
}
|
||||
oldRc := obj.(*api.ReplicationController)
|
||||
err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) {
|
||||
// Apply the update, then attempt to push it to the apiserver.
|
||||
applyUpdate(rc)
|
||||
if rc, err = rcClient.Update(rc); err == nil {
|
||||
if rc, e = c.ReplicationControllers(namespace).Update(rc); e == nil {
|
||||
// rc contains the latest controller post update
|
||||
return true, nil
|
||||
return
|
||||
}
|
||||
updateErr := e
|
||||
// Update the controller with the latest resource version, if the update failed we
|
||||
// can't trust rc so use oldRc.Name.
|
||||
if rc, err = rcClient.Get(oldRc.Name); err != nil {
|
||||
if rc, e = c.ReplicationControllers(namespace).Get(oldRc.Name); e != nil {
|
||||
// The Get failed: Value in rc cannot be trusted.
|
||||
rc = oldRc
|
||||
}
|
||||
// The Get passed: rc contains the latest controller, expect a poll for the update.
|
||||
return false, nil
|
||||
// Only return the error from update
|
||||
return updateErr
|
||||
})
|
||||
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
|
||||
// controller contains the applied update.
|
||||
return rc, err
|
||||
}
|
||||
|
||||
type updatePodFunc func(controller *api.Pod)
|
||||
|
||||
// updatePodWithRetries retries updating the given pod on conflict with the following steps:
|
||||
// 1. Get latest resource
|
||||
// 2. applyUpdate
|
||||
// 3. Update the resource
|
||||
func updatePodWithRetries(c client.Interface, namespace string, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) {
|
||||
// Deep copy the pod in case we failed on Get during retry loop
|
||||
obj, err := api.Scheme.Copy(pod)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to deep copy pod before updating it: %v", err)
|
||||
}
|
||||
oldPod := obj.(*api.Pod)
|
||||
err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) {
|
||||
// Apply the update, then attempt to push it to the apiserver.
|
||||
applyUpdate(pod)
|
||||
if pod, e = c.Pods(namespace).Update(pod); e == nil {
|
||||
return
|
||||
}
|
||||
updateErr := e
|
||||
if pod, e = c.Pods(namespace).Get(oldPod.Name); e != nil {
|
||||
pod = oldPod
|
||||
}
|
||||
// Only return the error from update
|
||||
return updateErr
|
||||
})
|
||||
// If the error is non-nil the returned pod cannot be trusted, if it is nil, the returned
|
||||
// controller contains the applied update.
|
||||
return pod, err
|
||||
}
|
||||
|
||||
func FindSourceController(r client.ReplicationControllersNamespacer, namespace, name string) (*api.ReplicationController, error) {
|
||||
list, err := r.ReplicationControllers(namespace).List(api.ListOptions{})
|
||||
if err != nil {
|
||||
|
|
|
@ -1370,7 +1370,7 @@ func TestUpdateExistingReplicationController(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestUpdateWithRetries(t *testing.T) {
|
||||
func TestUpdateRcWithRetries(t *testing.T) {
|
||||
codec := testapi.Default.Codec()
|
||||
rc := &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{Name: "rc",
|
||||
|
@ -1403,8 +1403,8 @@ func TestUpdateWithRetries(t *testing.T) {
|
|||
header := http.Header{}
|
||||
header.Set("Content-Type", runtime.ContentTypeJSON)
|
||||
updates := []*http.Response{
|
||||
{StatusCode: 500, Header: header, Body: objBody(codec, &api.ReplicationController{})},
|
||||
{StatusCode: 500, Header: header, Body: objBody(codec, &api.ReplicationController{})},
|
||||
{StatusCode: 409, Header: header, Body: objBody(codec, &api.ReplicationController{})}, // conflict
|
||||
{StatusCode: 409, Header: header, Body: objBody(codec, &api.ReplicationController{})}, // conflict
|
||||
{StatusCode: 200, Header: header, Body: objBody(codec, &newRc)},
|
||||
}
|
||||
gets := []*http.Response{
|
||||
|
@ -1442,8 +1442,8 @@ func TestUpdateWithRetries(t *testing.T) {
|
|||
client := client.NewOrDie(clientConfig)
|
||||
client.Client = fakeClient.Client
|
||||
|
||||
if rc, err := updateWithRetries(
|
||||
client.ReplicationControllers("default"), rc, func(c *api.ReplicationController) {
|
||||
if rc, err := updateRcWithRetries(
|
||||
client, "default", rc, func(c *api.ReplicationController) {
|
||||
c.Spec.Selector["baz"] = "foobar"
|
||||
}); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -1451,7 +1451,7 @@ func TestUpdateWithRetries(t *testing.T) {
|
|||
t.Errorf("Expected updated rc, got %+v", rc)
|
||||
}
|
||||
if len(updates) != 0 || len(gets) != 0 {
|
||||
t.Errorf("Remaining updates %+v gets %+v", updates, gets)
|
||||
t.Errorf("Remaining updates %#v gets %#v", updates, gets)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue