Backoff correctly when adopting replica sets/pods

pull/6/head
Michail Kargakis 2016-11-07 14:23:15 +01:00
parent fc361206e7
commit b3765c4df9
11 changed files with 208 additions and 366 deletions

View File

@ -40,8 +40,6 @@ go_library(
"//pkg/util/integer:go_default_library", "//pkg/util/integer:go_default_library",
"//pkg/util/labels:go_default_library", "//pkg/util/labels:go_default_library",
"//pkg/util/metrics:go_default_library", "//pkg/util/metrics:go_default_library",
"//pkg/util/pod:go_default_library",
"//pkg/util/replicaset:go_default_library",
"//pkg/util/runtime:go_default_library", "//pkg/util/runtime:go_default_library",
"//pkg/util/wait:go_default_library", "//pkg/util/wait:go_default_library",
"//pkg/util/workqueue:go_default_library", "//pkg/util/workqueue:go_default_library",

View File

@ -33,8 +33,6 @@ import (
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
labelsutil "k8s.io/kubernetes/pkg/util/labels" labelsutil "k8s.io/kubernetes/pkg/util/labels"
podutil "k8s.io/kubernetes/pkg/util/pod"
rsutil "k8s.io/kubernetes/pkg/util/replicaset"
) )
// syncStatusOnly only updates Deployments Status and doesn't take any mutating actions. // syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.
@ -162,22 +160,14 @@ func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extension
// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created // 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created
// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas // 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas
// 3. Add hash label to the rs's label and selector // 3. Add hash label to the rs's label and selector
func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet) (updatedRS *extensions.ReplicaSet, err error) { func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet) (*extensions.ReplicaSet, error) {
objCopy, err := api.Scheme.Copy(rs)
if err != nil {
return nil, err
}
updatedRS = objCopy.(*extensions.ReplicaSet)
// If the rs already has the new hash label in its selector, it's done syncing // If the rs already has the new hash label in its selector, it's done syncing
if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
return return rs, nil
} }
namespace := rs.Namespace hash := deploymentutil.GetReplicaSetHash(rs)
hash := rsutil.GetPodTemplateSpecHash(rs)
rsUpdated := false
// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS, updatedRS, err := deploymentutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(rs.Namespace), dc.rsLister, rs.Namespace, rs.Name,
func(updated *extensions.ReplicaSet) error { func(updated *extensions.ReplicaSet) error {
// Precondition: the RS doesn't contain the new hash in its pod template label. // Precondition: the RS doesn't contain the new hash in its pod template label.
if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash { if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
@ -187,20 +177,15 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet)
return nil return nil
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) return nil, fmt.Errorf("error updating replica set %s/%s pod template label with template hash: %v", rs.Namespace, rs.Name, err)
}
if !rsUpdated {
// If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error.
// Return here and retry in the next sync loop.
return rs, nil
} }
// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
if updatedRS.Generation > updatedRS.Status.ObservedGeneration { if updatedRS.Generation > updatedRS.Status.ObservedGeneration {
if err = deploymentutil.WaitForReplicaSetUpdated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil { if err = deploymentutil.WaitForReplicaSetUpdated(dc.client, updatedRS.Generation, updatedRS.Namespace, updatedRS.Name); err != nil {
return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err) return nil, fmt.Errorf("error waiting for replica set %s/%s to be observed by controller: %v", updatedRS.Namespace, updatedRS.Name, err)
} }
glog.V(4).Infof("Observed the update of replica set %s/%s's pod template with hash %s.", rs.Namespace, rs.Name, hash)
} }
glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
// 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. // 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
selector, err := metav1.LabelSelectorAsSelector(updatedRS.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(updatedRS.Spec.Selector)
@ -212,54 +197,45 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet)
if err != nil { if err != nil {
return nil, err return nil, err
} }
pods, err := dc.podLister.Pods(namespace).List(parsed) pods, err := dc.podLister.Pods(updatedRS.Namespace).List(parsed)
if err != nil { if err != nil {
return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err) return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", rs.Namespace, options, err)
} }
podList := v1.PodList{Items: make([]v1.Pod, 0, len(pods))} podList := v1.PodList{Items: make([]v1.Pod, 0, len(pods))}
for i := range pods { for i := range pods {
podList.Items = append(podList.Items, *pods[i]) podList.Items = append(podList.Items, *pods[i])
} }
allPodsLabeled := false if err := deploymentutil.LabelPodsWithHash(&podList, dc.client, dc.podLister, rs.Namespace, rs.Name, hash); err != nil {
if allPodsLabeled, err = deploymentutil.LabelPodsWithHash(&podList, updatedRS, dc.client, namespace, hash); err != nil {
return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err) return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err)
} }
// If not all pods are labeled but didn't return error in step 2, we've hit at least one pod not found error.
// Return here and retry in the next sync loop.
if !allPodsLabeled {
return updatedRS, nil
}
// We need to wait for the replicaset controller to observe the pods being // We need to wait for the replicaset controller to observe the pods being
// labeled with pod template hash. Because previously we've called // labeled with pod template hash. Because previously we've called
// WaitForReplicaSetUpdated, the replicaset controller should have dropped // WaitForReplicaSetUpdated, the replicaset controller should have dropped
// FullyLabeledReplicas to 0 already, we only need to wait it to increase // FullyLabeledReplicas to 0 already, we only need to wait it to increase
// back to the number of replicas in the spec. // back to the number of replicas in the spec.
if err = deploymentutil.WaitForPodsHashPopulated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil { if err := deploymentutil.WaitForPodsHashPopulated(dc.client, updatedRS.Generation, updatedRS.Namespace, updatedRS.Name); err != nil {
return nil, fmt.Errorf("%s %s/%s: error waiting for replicaset controller to observe pods being labeled with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) return nil, fmt.Errorf("Replica set %s/%s: error waiting for replicaset controller to observe pods being labeled with template hash: %v", updatedRS.Namespace, updatedRS.Name, err)
} }
// 3. Update rs label and selector to include the new hash label // 3. Update rs label and selector to include the new hash label
// Copy the old selector, so that we can scrub out any orphaned pods // Copy the old selector, so that we can scrub out any orphaned pods
if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS, updatedRS, err = deploymentutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(rs.Namespace), dc.rsLister, rs.Namespace, rs.Name, func(updated *extensions.ReplicaSet) error {
func(updated *extensions.ReplicaSet) error { // Precondition: the RS doesn't contain the new hash in its label and selector.
// Precondition: the RS doesn't contain the new hash in its label or selector. if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash { return utilerrors.ErrPreconditionViolated
return utilerrors.ErrPreconditionViolated }
} updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash)
updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) return nil
return nil })
}); err != nil { // If the RS isn't actually updated, that's okay, we'll retry in the
return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) // next sync loop since its selector isn't updated yet.
if err != nil {
return nil, fmt.Errorf("error updating ReplicaSet %s/%s label and selector with template hash: %v", updatedRS.Namespace, updatedRS.Name, err)
} }
if rsUpdated {
glog.V(4).Infof("Updated %s %s/%s's selector and label with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
}
// If the RS isn't actually updated in step 3, that's okay, we'll retry in the next sync loop since its selector isn't updated yet.
// TODO: look for orphaned pods and label them in the background somewhere else periodically // TODO: look for orphaned pods and label them in the background somewhere else periodically
return updatedRS, nil return updatedRS, nil
} }
@ -340,7 +316,7 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
// new ReplicaSet does not exist, create one. // new ReplicaSet does not exist, create one.
namespace := deployment.Namespace namespace := deployment.Namespace
podTemplateSpecHash := podutil.GetPodTemplateSpecHash(deployment.Spec.Template) podTemplateSpecHash := deploymentutil.GetPodTemplateSpecHash(deployment.Spec.Template)
newRSTemplate := deploymentutil.GetNewReplicaSetTemplate(deployment) newRSTemplate := deploymentutil.GetNewReplicaSetTemplate(deployment)
// Add podTemplateHash label to selector. // Add podTemplateHash label to selector.
newRSSelector := labelsutil.CloneSelectorAndAddLabel(deployment.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash) newRSSelector := labelsutil.CloneSelectorAndAddLabel(deployment.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)

View File

@ -12,7 +12,11 @@ load(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["deployment_util.go"], srcs = [
"deployment_util.go",
"pod_util.go",
"replicaset_util.go",
],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api:go_default_library", "//pkg/api:go_default_library",
@ -22,15 +26,19 @@ go_library(
"//pkg/apis/extensions:go_default_library", "//pkg/apis/extensions:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/meta/v1:go_default_library", "//pkg/apis/meta/v1:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/release_1_5:go_default_library", "//pkg/client/clientset_generated/release_1_5:go_default_library",
"//pkg/client/clientset_generated/release_1_5/typed/core/v1:go_default_library",
"//pkg/client/clientset_generated/release_1_5/typed/extensions/v1beta1:go_default_library",
"//pkg/client/retry:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/labels:go_default_library", "//pkg/labels:go_default_library",
"//pkg/runtime:go_default_library", "//pkg/runtime:go_default_library",
"//pkg/util/errors:go_default_library", "//pkg/util/errors:go_default_library",
"//pkg/util/hash:go_default_library",
"//pkg/util/integer:go_default_library", "//pkg/util/integer:go_default_library",
"//pkg/util/intstr:go_default_library", "//pkg/util/intstr:go_default_library",
"//pkg/util/labels:go_default_library", "//pkg/util/labels:go_default_library",
"//pkg/util/pod:go_default_library",
"//pkg/util/wait:go_default_library", "//pkg/util/wait:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
], ],

View File

@ -32,6 +32,7 @@ import (
internalextensions "k8s.io/kubernetes/pkg/apis/extensions" internalextensions "k8s.io/kubernetes/pkg/apis/extensions"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
@ -40,7 +41,6 @@ import (
"k8s.io/kubernetes/pkg/util/integer" "k8s.io/kubernetes/pkg/util/integer"
intstrutil "k8s.io/kubernetes/pkg/util/intstr" intstrutil "k8s.io/kubernetes/pkg/util/intstr"
labelsutil "k8s.io/kubernetes/pkg/util/labels" labelsutil "k8s.io/kubernetes/pkg/util/labels"
podutil "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
@ -699,12 +699,11 @@ func WaitForPodsHashPopulated(c clientset.Interface, desiredGeneration int64, na
// LabelPodsWithHash labels all pods in the given podList with the new hash label. // LabelPodsWithHash labels all pods in the given podList with the new hash label.
// The returned bool value can be used to tell if all pods are actually labeled. // The returned bool value can be used to tell if all pods are actually labeled.
func LabelPodsWithHash(podList *v1.PodList, rs *extensions.ReplicaSet, c clientset.Interface, namespace, hash string) (bool, error) { func LabelPodsWithHash(podList *v1.PodList, c clientset.Interface, podLister *cache.StoreToPodLister, namespace, name, hash string) error {
allPodsLabeled := true
for _, pod := range podList.Items { for _, pod := range podList.Items {
// Only label the pod that doesn't already have the new hash // Only label the pod that doesn't already have the new hash
if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash { if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash {
if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod, _, err := UpdatePodWithRetries(c.Core().Pods(namespace), podLister, pod.Namespace, pod.Name,
func(podToUpdate *v1.Pod) error { func(podToUpdate *v1.Pod) error {
// Precondition: the pod doesn't contain the new hash in its label. // Precondition: the pod doesn't contain the new hash in its label.
if podToUpdate.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash { if podToUpdate.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
@ -712,18 +711,14 @@ func LabelPodsWithHash(podList *v1.PodList, rs *extensions.ReplicaSet, c clients
} }
podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
return nil return nil
}); err != nil { })
return false, fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err) if err != nil {
} else if podUpdated { return fmt.Errorf("error in adding template hash label %s to pod %q: %v", hash, pod.Name, err)
glog.V(4).Infof("Labeled %s %s/%s of %s %s/%s with hash %s.", pod.Kind, pod.Namespace, pod.Name, rs.Kind, rs.Namespace, rs.Name, hash)
} else {
// If the pod wasn't updated but didn't return error when we try to update it, we've hit "pod not found" or "precondition violated" error.
// Then we can't say all pods are labeled
allPodsLabeled = false
} }
glog.V(4).Infof("Labeled pod %s/%s of ReplicaSet %s/%s with hash %s.", pod.Namespace, pod.Name, namespace, name, hash)
} }
} }
return allPodsLabeled, nil return nil
} }
// GetNewReplicaSetTemplate returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet. // GetNewReplicaSetTemplate returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet.
@ -736,7 +731,7 @@ func GetNewReplicaSetTemplate(deployment *extensions.Deployment) v1.PodTemplateS
newRSTemplate.ObjectMeta.Labels = labelsutil.CloneAndAddLabel( newRSTemplate.ObjectMeta.Labels = labelsutil.CloneAndAddLabel(
deployment.Spec.Template.ObjectMeta.Labels, deployment.Spec.Template.ObjectMeta.Labels,
extensions.DefaultDeploymentUniqueLabelKey, extensions.DefaultDeploymentUniqueLabelKey,
podutil.GetPodTemplateSpecHash(newRSTemplate)) GetPodTemplateSpecHash(newRSTemplate))
return newRSTemplate return newRSTemplate
} }
@ -751,7 +746,7 @@ func GetNewReplicaSetTemplateInternal(deployment *internalextensions.Deployment)
newRSTemplate.ObjectMeta.Labels = labelsutil.CloneAndAddLabel( newRSTemplate.ObjectMeta.Labels = labelsutil.CloneAndAddLabel(
deployment.Spec.Template.ObjectMeta.Labels, deployment.Spec.Template.ObjectMeta.Labels,
internalextensions.DefaultDeploymentUniqueLabelKey, internalextensions.DefaultDeploymentUniqueLabelKey,
podutil.GetInternalPodTemplateSpecHash(newRSTemplate)) GetInternalPodTemplateSpecHash(newRSTemplate))
return newRSTemplate return newRSTemplate
} }

View File

@ -0,0 +1,82 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"hash/adler32"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/cache"
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1"
"k8s.io/kubernetes/pkg/client/retry"
errorsutil "k8s.io/kubernetes/pkg/util/errors"
hashutil "k8s.io/kubernetes/pkg/util/hash"
)
func GetPodTemplateSpecHash(template v1.PodTemplateSpec) uint32 {
podTemplateSpecHasher := adler32.New()
hashutil.DeepHashObject(podTemplateSpecHasher, template)
return podTemplateSpecHasher.Sum32()
}
// TODO: remove the duplicate
func GetInternalPodTemplateSpecHash(template api.PodTemplateSpec) uint32 {
podTemplateSpecHasher := adler32.New()
hashutil.DeepHashObject(podTemplateSpecHasher, template)
return podTemplateSpecHasher.Sum32()
}
// TODO: use client library instead when it starts to support update retries
// see https://github.com/kubernetes/kubernetes/issues/21479
type updatePodFunc func(pod *v1.Pod) error
// UpdatePodWithRetries updates a pod with given applyUpdate function. Note that pod not found error is ignored.
// The returned bool value can be used to tell if the pod is actually updated.
func UpdatePodWithRetries(podClient v1core.PodInterface, podLister *cache.StoreToPodLister, namespace, name string, applyUpdate updatePodFunc) (*v1.Pod, error) {
var pod *v1.Pod
retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var err error
pod, err = podLister.Pods(namespace).Get(name)
if err != nil {
return err
}
obj, deepCopyErr := api.Scheme.DeepCopy(pod)
if deepCopyErr != nil {
return deepCopyErr
}
pod = obj.(*v1.Pod)
// Apply the update, then attempt to push it to the apiserver.
if applyErr := applyUpdate(pod); applyErr != nil {
return applyErr
}
pod, err = podClient.Update(pod)
return err
})
// Ignore the precondition violated error, this pod is already updated
// with the desired label.
if retryErr == errorsutil.ErrPreconditionViolated {
glog.V(4).Infof("Pod %s/%s precondition doesn't hold, skip updating it.", namespace, name)
retryErr = nil
}
return pod, retryErr
}

View File

@ -0,0 +1,79 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/cache"
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/retry"
errorsutil "k8s.io/kubernetes/pkg/util/errors"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
)
// TODO: use client library instead when it starts to support update retries
// see https://github.com/kubernetes/kubernetes/issues/21479
type updateRSFunc func(rs *extensions.ReplicaSet) error
// UpdateRSWithRetries updates a RS with given applyUpdate function. Note that RS not found error is ignored.
// The returned bool value can be used to tell if the RS is actually updated.
func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rsLister *cache.StoreToReplicaSetLister, namespace, name string, applyUpdate updateRSFunc) (*extensions.ReplicaSet, error) {
var rs *extensions.ReplicaSet
retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var err error
rs, err = rsLister.ReplicaSets(namespace).Get(name)
if err != nil {
return err
}
obj, deepCopyErr := api.Scheme.DeepCopy(rs)
if deepCopyErr != nil {
return deepCopyErr
}
rs = obj.(*extensions.ReplicaSet)
// Apply the update, then attempt to push it to the apiserver.
if applyErr := applyUpdate(rs); applyErr != nil {
return applyErr
}
rs, err = rsClient.Update(rs)
return err
})
// Ignore the precondition violated error, but the RS isn't updated.
if retryErr == errorsutil.ErrPreconditionViolated {
glog.V(4).Infof("Replica set %s/%s precondition doesn't hold, skip updating it.", namespace, name)
retryErr = nil
}
return rs, retryErr
}
// GetReplicaSetHash returns the pod template hash of a ReplicaSet's pod template space
func GetReplicaSetHash(rs *extensions.ReplicaSet) string {
meta := rs.Spec.Template.ObjectMeta
meta.Labels = labelsutil.CloneAndRemoveLabel(meta.Labels, extensions.DefaultDeploymentUniqueLabelKey)
return fmt.Sprintf("%d", GetPodTemplateSpecHash(v1.PodTemplateSpec{
ObjectMeta: meta,
Spec: rs.Spec.Template.Spec,
}))
}

View File

@ -1,30 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"pod.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/release_1_5/typed/core/v1:go_default_library",
"//pkg/util/errors:go_default_library",
"//pkg/util/hash:go_default_library",
"//pkg/util/wait:go_default_library",
"//vendor:github.com/golang/glog",
],
)

View File

@ -1,18 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package pod provides utilities to work with Kubernetes pod and pod templates.
package pod // import "k8s.io/kubernetes/pkg/util/pod"

View File

@ -1,108 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pod
import (
"fmt"
"hash/adler32"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/v1"
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1"
errorsutil "k8s.io/kubernetes/pkg/util/errors"
hashutil "k8s.io/kubernetes/pkg/util/hash"
"k8s.io/kubernetes/pkg/util/wait"
)
func GetPodTemplateSpecHash(template v1.PodTemplateSpec) uint32 {
podTemplateSpecHasher := adler32.New()
hashutil.DeepHashObject(podTemplateSpecHasher, template)
return podTemplateSpecHasher.Sum32()
}
// TODO: remove the duplicate
func GetInternalPodTemplateSpecHash(template api.PodTemplateSpec) uint32 {
podTemplateSpecHasher := adler32.New()
hashutil.DeepHashObject(podTemplateSpecHasher, template)
return podTemplateSpecHasher.Sum32()
}
// TODO: use client library instead when it starts to support update retries
// see https://github.com/kubernetes/kubernetes/issues/21479
type updatePodFunc func(pod *v1.Pod) error
// UpdatePodWithRetries updates a pod with given applyUpdate function. Note that pod not found error is ignored.
// The returned bool value can be used to tell if the pod is actually updated.
func UpdatePodWithRetries(podClient v1core.PodInterface, pod *v1.Pod, applyUpdate updatePodFunc) (*v1.Pod, bool, error) {
var err error
var podUpdated bool
oldPod := pod
if err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
pod, err = podClient.Get(oldPod.Name)
if err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
if err = applyUpdate(pod); err != nil {
return false, err
}
if pod, err = podClient.Update(pod); err == nil {
// Update successful.
return true, nil
}
// TODO: don't retry on perm-failed errors and handle them gracefully
// Update could have failed due to conflict error. Try again.
return false, nil
}); err == nil {
// When there's no error, we've updated this pod.
podUpdated = true
}
// Handle returned error from wait poll
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("timed out trying to update pod: %#v", oldPod)
}
// Ignore the pod not found error, but the pod isn't updated.
if errors.IsNotFound(err) {
glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldPod.Kind, oldPod.Namespace, oldPod.Name)
err = nil
}
// Ignore the precondition violated error, but the pod isn't updated.
if err == errorsutil.ErrPreconditionViolated {
glog.V(4).Infof("%s %s/%s precondition doesn't hold, skip updating it.", oldPod.Kind, oldPod.Namespace, oldPod.Name)
err = nil
}
// If the error is non-nil the returned pod cannot be trusted; if podUpdated is false, the pod isn't updated;
// if the error is nil and podUpdated is true, the returned pod contains the applied update.
return pod, podUpdated, err
}
// Filter uses the input function f to filter the given pod list, and return the filtered pods
func Filter(podList *v1.PodList, f func(v1.Pod) bool) []v1.Pod {
pods := make([]v1.Pod, 0)
for _, p := range podList.Items {
if f(p) {
pods = append(pods, p)
}
}
return pods
}

View File

@ -1,30 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = ["replicaset.go"],
tags = ["automanaged"],
deps = [
"//pkg/api/errors:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/meta/v1:go_default_library",
"//pkg/client/clientset_generated/release_1_5/typed/extensions/v1beta1:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/util/errors:go_default_library",
"//pkg/util/labels:go_default_library",
"//pkg/util/pod:go_default_library",
"//pkg/util/wait:go_default_library",
"//vendor:github.com/golang/glog",
],
)

View File

@ -1,110 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replicaset
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/extensions/v1beta1"
"k8s.io/kubernetes/pkg/labels"
errorsutil "k8s.io/kubernetes/pkg/util/errors"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
podutil "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/kubernetes/pkg/util/wait"
)
// TODO: use client library instead when it starts to support update retries
// see https://github.com/kubernetes/kubernetes/issues/21479
type updateRSFunc func(rs *extensions.ReplicaSet) error
// UpdateRSWithRetries updates a RS with given applyUpdate function. Note that RS not found error is ignored.
// The returned bool value can be used to tell if the RS is actually updated.
func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateRSFunc) (*extensions.ReplicaSet, bool, error) {
var err error
var rsUpdated bool
oldRs := rs
if err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
rs, err = rsClient.Get(oldRs.Name)
if err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
if err = applyUpdate(rs); err != nil {
return false, err
}
if rs, err = rsClient.Update(rs); err == nil {
// Update successful.
return true, nil
}
// TODO: don't retry on perm-failed errors and handle them gracefully
// Update could have failed due to conflict error. Try again.
return false, nil
}); err == nil {
// When there's no error, we've updated this RS.
rsUpdated = true
}
// Handle returned error from wait poll
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("timed out trying to update RS: %#v", oldRs)
}
// Ignore the RS not found error, but the RS isn't updated.
if errors.IsNotFound(err) {
glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldRs.Kind, oldRs.Namespace, oldRs.Name)
err = nil
}
// Ignore the precondition violated error, but the RS isn't updated.
if err == errorsutil.ErrPreconditionViolated {
glog.V(4).Infof("%s %s/%s precondition doesn't hold, skip updating it.", oldRs.Kind, oldRs.Namespace, oldRs.Name)
err = nil
}
// If the error is non-nil the returned RS cannot be trusted; if rsUpdated is false, the contoller isn't updated;
// if the error is nil and rsUpdated is true, the returned RS contains the applied update.
return rs, rsUpdated, err
}
// GetPodTemplateSpecHash returns the pod template hash of a ReplicaSet's pod template space
func GetPodTemplateSpecHash(rs *extensions.ReplicaSet) string {
meta := rs.Spec.Template.ObjectMeta
meta.Labels = labelsutil.CloneAndRemoveLabel(meta.Labels, extensions.DefaultDeploymentUniqueLabelKey)
return fmt.Sprintf("%d", podutil.GetPodTemplateSpecHash(v1.PodTemplateSpec{
ObjectMeta: meta,
Spec: rs.Spec.Template.Spec,
}))
}
// MatchingPodsFunc returns a filter function for pods with matching labels
func MatchingPodsFunc(rs *extensions.ReplicaSet) (func(v1.Pod) bool, error) {
if rs == nil {
return nil, nil
}
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %v", err)
}
return func(pod v1.Pod) bool {
podLabelsSelector := labels.Set(pod.ObjectMeta.Labels)
return selector.Matches(podLabelsSelector)
}, nil
}