From 75e570832b7682ab5abd628dd4d7787698cfc567 Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Wed, 2 Mar 2016 12:52:12 -0800 Subject: [PATCH] Ignore not found error when retrying rs and pods updates --- pkg/util/deployment/deployment.go | 110 +++++++++++------------------- pkg/util/pod/pod.go | 47 +++++++++++++ pkg/util/replicaset/replicaset.go | 71 +++++++++++++++++++ 3 files changed, 158 insertions(+), 70 deletions(-) create mode 100644 pkg/util/replicaset/replicaset.go diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 4375a20e12..957029c574 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -27,13 +27,12 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" - unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/integer" intstrutil "k8s.io/kubernetes/pkg/util/intstr" labelsutil "k8s.io/kubernetes/pkg/util/labels" podutil "k8s.io/kubernetes/pkg/util/pod" + rsutil "k8s.io/kubernetes/pkg/util/replicaset" "k8s.io/kubernetes/pkg/util/wait" ) @@ -186,21 +185,28 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf ObjectMeta: meta, Spec: rs.Spec.Template.Spec, })) + rsUpdated := false // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. if len(updatedRS.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 { - updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) { + updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) { updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) }) if err != nil { - return nil, fmt.Errorf("error updating rs %s pod template label with template hash: %v", updatedRS.Name, err) + return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) } - // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). - if updatedRS.Generation > updatedRS.Status.ObservedGeneration { - if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil { - return nil, fmt.Errorf("error waiting for rs %s generation %d observed by controller: %v", updatedRS.Name, updatedRS.Generation, err) + if rsUpdated { + // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). + if updatedRS.Generation > updatedRS.Status.ObservedGeneration { + if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil { + return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err) + } } + glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) + } else { + // If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error. + // Return here and retry in the next sync loop. + return &rs, nil } - glog.V(4).Infof("Observed the update of rs %s's pod template with hash %s.", rs.Name, hash) } // 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. @@ -213,20 +219,28 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf if err != nil { return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err) } - if err = labelPodsWithHash(podList, c, namespace, hash); err != nil { + allPodsLabeled := false + if allPodsLabeled, err = labelPodsWithHash(podList, updatedRS, c, namespace, hash); err != nil { return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err) } - glog.V(4).Infof("Labeled rs %s's pods with hash %s.", rs.Name, hash) + // If not all pods are labeled but didn't return error in step 2, we've hit at least one pod not found error. + // Return here and retry in the next sync loop. + if !allPodsLabeled { + return updatedRS, nil + } // 3. Update rs label and selector to include the new hash label // Copy the old selector, so that we can scrub out any orphaned pods - if updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) { + if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) { updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) }); err != nil { - return nil, fmt.Errorf("error updating rs %s label and selector with template hash: %v", updatedRS.Name, err) + return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) } - glog.V(4).Infof("Updated rs %s's selector and label with hash %s.", rs.Name, hash) + if rsUpdated { + glog.V(4).Infof("Updated %s %s/%s's selector and label with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) + } + // If the RS isn't actually updated in step 3, that's okay, we'll retry in the next sync loop since its selector isn't updated yet. // TODO: look for orphaned pods and label them in the background somewhere else periodically @@ -244,70 +258,26 @@ func waitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, na } // labelPodsWithHash labels all pods in the given podList with the new hash label. -func labelPodsWithHash(podList *api.PodList, c clientset.Interface, namespace, hash string) error { +// The returned bool value can be used to tell if all pods are actually labeled. +func labelPodsWithHash(podList *api.PodList, rs *extensions.ReplicaSet, c clientset.Interface, namespace, hash string) (bool, error) { + allPodsLabeled := true for _, pod := range podList.Items { // Only label the pod that doesn't already have the new hash if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash { - if _, err := updatePodWithRetries(c.Core().Pods(namespace), &pod, func(podToUpdate *api.Pod) { + if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod, func(podToUpdate *api.Pod) { podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) }); err != nil { - return fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err) + return false, fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err) + } else if podUpdated { + glog.V(4).Infof("Labeled %s %s/%s of %s %s/%s with hash %s.", pod.Kind, pod.Namespace, pod.Name, rs.Kind, rs.Namespace, rs.Name, hash) + } else { + // If the pod wasn't updated but didn't return error when we try to update it, we've hit a pod not found error. + // Then we can't say all pods are labeled + allPodsLabeled = false } - glog.V(4).Infof("Labeled pod %s with hash %s.", pod.Name, hash) } } - return nil -} - -// TODO: use client library instead when it starts to support update retries -// see https://github.com/kubernetes/kubernetes/issues/21479 -type updateRSFunc func(rs *extensions.ReplicaSet) - -func updateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateRSFunc) (*extensions.ReplicaSet, error) { - var err error - oldRs := rs - err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { - rs, err = rsClient.Get(oldRs.Name) - if err != nil { - return false, err - } - // Apply the update, then attempt to push it to the apiserver. - applyUpdate(rs) - if rs, err = rsClient.Update(rs); err == nil { - // Update successful. - return true, nil - } - // Update could have failed due to conflict error. Try again. - return false, nil - }) - // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned - // controller contains the applied update. - return rs, err -} - -type updatePodFunc func(pod *api.Pod) - -func updatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) { - var err error - oldPod := pod - err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { - pod, err = podClient.Get(oldPod.Name) - if err != nil { - return false, err - } - // Apply the update, then attempt to push it to the apiserver. - applyUpdate(pod) - if pod, err = podClient.Update(pod); err == nil { - // Update successful. - return true, nil - } - // Update could have failed due to conflict error. Try again. - return false, nil - }) - if err == wait.ErrWaitTimeout { - return nil, fmt.Errorf("timed out trying to update pod: %+v", oldPod) - } - return pod, err + return allPodsLabeled, nil } // Returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet. diff --git a/pkg/util/pod/pod.go b/pkg/util/pod/pod.go index 576b0cdab8..8f755473bb 100644 --- a/pkg/util/pod/pod.go +++ b/pkg/util/pod/pod.go @@ -17,10 +17,17 @@ limitations under the License. package pod import ( + "fmt" "hash/adler32" + "time" + + "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" hashutil "k8s.io/kubernetes/pkg/util/hash" + "k8s.io/kubernetes/pkg/util/wait" ) func GetPodTemplateSpecHash(template api.PodTemplateSpec) uint32 { @@ -28,3 +35,43 @@ func GetPodTemplateSpecHash(template api.PodTemplateSpec) uint32 { hashutil.DeepHashObject(podTemplateSpecHasher, template) return podTemplateSpecHasher.Sum32() } + +// TODO: use client library instead when it starts to support update retries +// see https://github.com/kubernetes/kubernetes/issues/21479 +type updatePodFunc func(pod *api.Pod) + +// UpdatePodWithRetries updates a pod with given applyUpdate function. Note that pod not found error is ignored. +// The returned bool value can be used to tell if the pod is actually updated. +func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, bool, error) { + var err error + var podUpdated bool + oldPod := pod + if err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { + pod, err = podClient.Get(oldPod.Name) + if err != nil { + return false, err + } + // Apply the update, then attempt to push it to the apiserver. + // TODO: add precondition for update + applyUpdate(pod) + if pod, err = podClient.Update(pod); err == nil { + // Update successful. + return true, nil + } + // TODO: don't retry on perm-failed errors and handle them gracefully + // Update could have failed due to conflict error. Try again. + return false, nil + }); err == nil { + // When there's no error, we've updated this pod. + podUpdated = true + } + + if err == wait.ErrWaitTimeout { + err = fmt.Errorf("timed out trying to update pod: %+v", oldPod) + } + if errors.IsNotFound(err) { + glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldPod.Kind, oldPod.Namespace, oldPod.Name) + err = nil + } + return pod, podUpdated, err +} diff --git a/pkg/util/replicaset/replicaset.go b/pkg/util/replicaset/replicaset.go new file mode 100644 index 0000000000..3859f20186 --- /dev/null +++ b/pkg/util/replicaset/replicaset.go @@ -0,0 +1,71 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replicaset + +import ( + "fmt" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/apis/extensions" + unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" + "k8s.io/kubernetes/pkg/util/wait" +) + +// TODO: use client library instead when it starts to support update retries +// see https://github.com/kubernetes/kubernetes/issues/21479 +type updateRSFunc func(rs *extensions.ReplicaSet) + +// UpdateRSWithRetries updates a RS with given applyUpdate function. Note that RS not found error is ignored. +// The returned bool value can be used to tell if the RS is actually updated. +func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateRSFunc) (*extensions.ReplicaSet, bool, error) { + var err error + var rsUpdated bool + oldRs := rs + if err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { + rs, err = rsClient.Get(oldRs.Name) + if err != nil { + return false, err + } + // Apply the update, then attempt to push it to the apiserver. + // TODO: add precondition for update + applyUpdate(rs) + if rs, err = rsClient.Update(rs); err == nil { + // Update successful. + return true, nil + } + // TODO: don't retry on perm-failed errors and handle them gracefully + // Update could have failed due to conflict error. Try again. + return false, nil + }); err == nil { + // When there's no error, we've updated this RS. + rsUpdated = true + } + + if err == wait.ErrWaitTimeout { + err = fmt.Errorf("timed out trying to update RS: %+v", oldRs) + } + // Ignore the RS not found error, but the RS isn't updated. + if errors.IsNotFound(err) { + glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldRs.Kind, oldRs.Namespace, oldRs.Name) + err = nil + } + // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned + // controller contains the applied update. + return rs, rsUpdated, err +}