diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 5728e1d87f..dde5113400 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/integer" intstrutil "k8s.io/kubernetes/pkg/util/intstr" labelsutil "k8s.io/kubernetes/pkg/util/labels" @@ -188,11 +189,13 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf rsUpdated := false // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, - func(updated *extensions.ReplicaSet) bool { - return updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash - }, - func(updated *extensions.ReplicaSet) { + func(updated *extensions.ReplicaSet) error { + // Precondition: the RS doesn't contain the new hash in its pod template label. + if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash { + return errors.ErrPreconditionViolated + } updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + return nil }) if err != nil { return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) @@ -235,12 +238,14 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf // 3. Update rs label and selector to include the new hash label // Copy the old selector, so that we can scrub out any orphaned pods if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, - func(updated *extensions.ReplicaSet) bool { - return updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash || updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] != hash - }, - func(updated *extensions.ReplicaSet) { + func(updated *extensions.ReplicaSet) error { + // Precondition: the RS doesn't contain the new hash in its label or selector. + if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash { + return errors.ErrPreconditionViolated + } updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) + return nil }); err != nil { return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) } @@ -272,17 +277,19 @@ func labelPodsWithHash(podList *api.PodList, rs *extensions.ReplicaSet, c client // Only label the pod that doesn't already have the new hash if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash { if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod, - func(podToUpdate *api.Pod) bool { - return podToUpdate.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash - }, - func(podToUpdate *api.Pod) { + func(podToUpdate *api.Pod) error { + // Precondition: the pod doesn't contain the new hash in its label. + if podToUpdate.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash { + return errors.ErrPreconditionViolated + } podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + return nil }); err != nil { return false, fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err) } else if podUpdated { glog.V(4).Infof("Labeled %s %s/%s of %s %s/%s with hash %s.", pod.Kind, pod.Namespace, pod.Name, rs.Kind, rs.Namespace, rs.Name, hash) } else { - // If the pod wasn't updated but didn't return error when we try to update it, we've hit a pod not found error. + // If the pod wasn't updated but didn't return error when we try to update it, we've hit "pod not found" or "precondition violated" error. // Then we can't say all pods are labeled allPodsLabeled = false } diff --git a/pkg/util/errors/errors.go b/pkg/util/errors/errors.go index a1a8e7aa24..df3adaf3e8 100644 --- a/pkg/util/errors/errors.go +++ b/pkg/util/errors/errors.go @@ -16,7 +16,10 @@ limitations under the License. package errors -import "fmt" +import ( + "errors" + "fmt" +) // Aggregate represents an object that contains multiple errors, but does not // necessarily have singular semantic meaning. @@ -148,3 +151,6 @@ func AggregateGoroutines(funcs ...func() error) Aggregate { } return NewAggregate(errs) } + +// ErrPreconditionViolated is returned when the precondition is violated +var ErrPreconditionViolated = errors.New("precondition is violated") diff --git a/pkg/util/pod/pod.go b/pkg/util/pod/pod.go index f8f6367513..8fb5cadd0b 100644 --- a/pkg/util/pod/pod.go +++ b/pkg/util/pod/pod.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" + errorsutil "k8s.io/kubernetes/pkg/util/errors" hashutil "k8s.io/kubernetes/pkg/util/hash" "k8s.io/kubernetes/pkg/util/wait" ) @@ -38,12 +39,11 @@ func GetPodTemplateSpecHash(template api.PodTemplateSpec) uint32 { // TODO: use client library instead when it starts to support update retries // see https://github.com/kubernetes/kubernetes/issues/21479 -type updatePodFunc func(pod *api.Pod) -type preconditionFunc func(pod *api.Pod) bool +type updatePodFunc func(pod *api.Pod) error -// UpdatePodWithRetries updates a pod with given applyUpdate function, when the given precondition holds. Note that pod not found error is ignored. +// UpdatePodWithRetries updates a pod with given applyUpdate function. Note that pod not found error is ignored. // The returned bool value can be used to tell if the pod is actually updated. -func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, preconditionHold preconditionFunc, applyUpdate updatePodFunc) (*api.Pod, bool, error) { +func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, bool, error) { var err error var podUpdated bool oldPod := pod @@ -52,12 +52,10 @@ func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, if err != nil { return false, err } - if !preconditionHold(pod) { - glog.V(4).Infof("pod %s precondition doesn't hold, skip updating it.", pod.Name) - return true, nil - } // Apply the update, then attempt to push it to the apiserver. - applyUpdate(pod) + if err = applyUpdate(pod); err != nil { + return false, err + } if pod, err = podClient.Update(pod); err == nil { // Update successful. return true, nil @@ -70,12 +68,22 @@ func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, podUpdated = true } + // Handle returned error from wait poll if err == wait.ErrWaitTimeout { err = fmt.Errorf("timed out trying to update pod: %+v", oldPod) } + // Ignore the pod not found error, but the pod isn't updated. if errors.IsNotFound(err) { glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldPod.Kind, oldPod.Namespace, oldPod.Name) err = nil } + // Ignore the precondition violated error, but the pod isn't updated. + if err == errorsutil.ErrPreconditionViolated { + glog.V(4).Infof("%s %s/%s precondition doesn't hold, skip updating it.", oldPod.Kind, oldPod.Namespace, oldPod.Name) + err = nil + } + + // If the error is non-nil the returned pod cannot be trusted; if podUpdated is false, the pod isn't updated; + // if the error is nil and podUpdated is true, the returned pod contains the applied update. return pod, podUpdated, err } diff --git a/pkg/util/replicaset/replicaset.go b/pkg/util/replicaset/replicaset.go index 9f18219805..218298a515 100644 --- a/pkg/util/replicaset/replicaset.go +++ b/pkg/util/replicaset/replicaset.go @@ -24,17 +24,17 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/apis/extensions" unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" + errorsutil "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/wait" ) // TODO: use client library instead when it starts to support update retries // see https://github.com/kubernetes/kubernetes/issues/21479 -type updateRSFunc func(rs *extensions.ReplicaSet) -type preconditionFunc func(rs *extensions.ReplicaSet) bool +type updateRSFunc func(rs *extensions.ReplicaSet) error -// UpdateRSWithRetries updates a RS with given applyUpdate function, when the given precondition holds. Note that RS not found error is ignored. +// UpdateRSWithRetries updates a RS with given applyUpdate function. Note that RS not found error is ignored. // The returned bool value can be used to tell if the RS is actually updated. -func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, preconditionHold preconditionFunc, applyUpdate updateRSFunc) (*extensions.ReplicaSet, bool, error) { +func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateRSFunc) (*extensions.ReplicaSet, bool, error) { var err error var rsUpdated bool oldRs := rs @@ -43,12 +43,10 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs if err != nil { return false, err } - if !preconditionHold(rs) { - glog.V(4).Infof("rs %s precondition doesn't hold, skip updating it.", rs.Name) - return true, nil - } // Apply the update, then attempt to push it to the apiserver. - applyUpdate(rs) + if err = applyUpdate(rs); err != nil { + return false, err + } if rs, err = rsClient.Update(rs); err == nil { // Update successful. return true, nil @@ -61,6 +59,7 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs rsUpdated = true } + // Handle returned error from wait poll if err == wait.ErrWaitTimeout { err = fmt.Errorf("timed out trying to update RS: %+v", oldRs) } @@ -69,7 +68,13 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldRs.Kind, oldRs.Namespace, oldRs.Name) err = nil } - // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned - // controller contains the applied update. + // Ignore the precondition violated error, but the RS isn't updated. + if err == errorsutil.ErrPreconditionViolated { + glog.V(4).Infof("%s %s/%s precondition doesn't hold, skip updating it.", oldRs.Kind, oldRs.Namespace, oldRs.Name) + err = nil + } + + // If the error is non-nil the returned RS cannot be trusted; if rsUpdated is false, the contoller isn't updated; + // if the error is nil and rsUpdated is true, the returned RS contains the applied update. return rs, rsUpdated, err }