diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index b1eaa67897..060dfc8a44 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -665,6 +665,10 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen newRevision := strconv.FormatInt(maxOldRevision+1, 10) existingNewRS, err := deploymentutil.GetNewReplicaSetFromList(deployment, dc.client, + func(namespace string, options api.ListOptions) (*api.PodList, error) { + podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) + return &podList, err + }, func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector) }) diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 5f5baa782e..e3359a4b7b 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -25,11 +25,13 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + extensions_unversioned "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/integer" intstrutil "k8s.io/kubernetes/pkg/util/intstr" labelsutil "k8s.io/kubernetes/pkg/util/labels" podutil "k8s.io/kubernetes/pkg/util/pod" + "k8s.io/kubernetes/pkg/util/wait" ) const ( @@ -55,9 +57,12 @@ func GetOldReplicaSets(deployment extensions.Deployment, c clientset.Interface) }) } +type rsListFunc func(string, api.ListOptions) ([]extensions.ReplicaSet, error) +type podListFunc func(string, api.ListOptions) (*api.PodList, error) + // GetOldReplicaSetsFromLists returns two sets of old replica sets targeted by the given Deployment; get PodList and ReplicaSetList with input functions. // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets. -func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.Interface, getPodList func(string, api.ListOptions) (*api.PodList, error), getRSList func(string, api.ListOptions) ([]extensions.ReplicaSet, error)) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { +func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { namespace := deployment.ObjectMeta.Namespace selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) if err != nil { @@ -74,7 +79,7 @@ func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.In // TODO: Right now we list all replica sets and then filter. We should add an API for this. oldRSs := map[string]extensions.ReplicaSet{} allOldRSs := map[string]extensions.ReplicaSet{} - rsList, err := getRSList(namespace, options) + rsList, err := rsListWithHashKeySynced(deployment, c, getRSList, getPodList) if err != nil { return nil, nil, fmt.Errorf("error listing replica sets: %v", err) } @@ -113,6 +118,9 @@ func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.In // Returns nil if the new replica set doesn't exist yet. func GetNewReplicaSet(deployment extensions.Deployment, c clientset.Interface) (*extensions.ReplicaSet, error) { return GetNewReplicaSetFromList(deployment, c, + func(namespace string, options api.ListOptions) (*api.PodList, error) { + return c.Core().Pods(namespace).List(options) + }, func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { rsList, err := c.Extensions().ReplicaSets(namespace).List(options) return rsList.Items, err @@ -121,14 +129,8 @@ func GetNewReplicaSet(deployment extensions.Deployment, c clientset.Interface) ( // GetNewReplicaSetFromList returns a replica set that matches the intent of the given deployment; get ReplicaSetList with the input function. // Returns nil if the new replica set doesn't exist yet. -func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Interface, getRSList func(string, api.ListOptions) ([]extensions.ReplicaSet, error)) (*extensions.ReplicaSet, error) { - namespace := deployment.ObjectMeta.Namespace - selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) - if err != nil { - return nil, fmt.Errorf("invalid label selector: %v", err) - } - - rsList, err := getRSList(namespace, api.ListOptions{LabelSelector: selector}) +func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) (*extensions.ReplicaSet, error) { + rsList, err := rsListWithHashKeySynced(deployment, c, getRSList, getPodList) if err != nil { return nil, fmt.Errorf("error listing ReplicaSets: %v", err) } @@ -144,6 +146,136 @@ func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Inte return nil, nil } +// rsListWithHashKeySynced returns a list of rs the deployment targets, with pod-template-hash information synced. +func rsListWithHashKeySynced(deployment extensions.Deployment, c clientset.Interface, getRSList rsListFunc, getPodList podListFunc) ([]extensions.ReplicaSet, error) { + namespace := deployment.Namespace + selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) + if err != nil { + return nil, err + } + options := api.ListOptions{LabelSelector: selector} + rsList, err := getRSList(namespace, options) + if err != nil { + return nil, err + } + syncedRSList := []extensions.ReplicaSet{} + for _, rs := range rsList { + // Add pod-template-hash information if it's not in the rs + if !labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { + updatedRS, err := addHashKeyToReplicaSet(deployment, c, rs, getPodList) + if err != nil { + return nil, err + } + syncedRSList = append(syncedRSList, *updatedRS) + } + syncedRSList = append(syncedRSList, rs) + } + return syncedRSList, nil +} + +// addHashKeyToReplicaSet adds pod-template-hash information to the given rs with the following steps: +// 1. Add hash label to the rs's pod template +// 2. Add hash label to all pods this rs owns +// 3. Add hash label to the rs's selector +// 4. Clean up all pods this rs owns but without the hash label (orphaned pods) +func addHashKeyToReplicaSet(deployment extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (*extensions.ReplicaSet, error) { + if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { + return &rs, nil + } + namespace := deployment.Namespace + hash := podutil.GetPodTemplateSpecHash(*rs.Spec.Template) + // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. + updatedRS, err := updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) { + updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + }) + if err != nil { + return nil, err + } + + // 2. Update all pods managed by the rs to have the new hash label, so they are correctly adopted. + selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector) + if err != nil { + return nil, err + } + options := api.ListOptions{LabelSelector: selector} + podList, err := getPodList(namespace, options) + if err != nil { + return nil, err + } + for _, pod := range podList.Items { + pod.Labels = labelsutil.AddLabel(pod.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + delay, maxRetries := 3, 3 + for i := 0; i < maxRetries; i++ { + _, err = c.Core().Pods(namespace).Update(&pod) + if err != nil { + time.Sleep(time.Second * time.Duration(delay)) + delay *= delay + } else { + break + } + } + if err != nil { + return nil, err + } + } + + // 3. Update rs selector + // Copy the old selector, so that we can scrub out any orphaned pods + oldSelector := updatedRS.Spec.Selector + // Update the selector of the rs so it manages all the pods we updated above + if updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) { + updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) + }); err != nil { + return nil, err + } + + // 4. Clean up any orphaned pods that don't have the new label, this can happen if the rs manager + // doesn't see the update to its pod template and creates a new pod with the old labels after + // we've finished re-adopting existing pods to the rs. + selector, err = unversioned.LabelSelectorAsSelector(oldSelector) + if err != nil { + return nil, err + } + options = api.ListOptions{LabelSelector: selector} + podList, err = getPodList(namespace, options) + hashString := fmt.Sprintf("%d", hash) + for _, pod := range podList.Items { + if value, found := pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]; !found || value != hashString { + if err := c.Core().Pods(namespace).Delete(pod.Name, nil); err != nil { + return nil, err + } + } + } + + return updatedRS, nil +} + +type updateFunc func(rs *extensions.ReplicaSet) + +func updateRSWithRetries(rsClient extensions_unversioned.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateFunc) (*extensions.ReplicaSet, error) { + var err error + oldRs := rs + err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { + // Apply the update, then attempt to push it to the apiserver. + applyUpdate(rs) + if rs, err = rsClient.Update(rs); err == nil { + // rs contains the latest controller post update + return true, nil + } + // Update the controller with the latest resource version, if the update failed we + // can't trust rs so use oldRs.Name. + if rs, err = rsClient.Get(oldRs.Name); err != nil { + // The Get failed: Value in rs cannot be trusted. + rs = oldRs + } + // The Get passed: rs contains the latest controller, expect a poll for the update. + return false, nil + }) + // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned + // controller contains the applied update. + return rs, err +} + // Returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet. func GetNewReplicaSetTemplate(deployment extensions.Deployment) api.PodTemplateSpec { // newRS will have the same template as in deployment spec, plus a unique label in some cases. diff --git a/pkg/util/labels/labels.go b/pkg/util/labels/labels.go index c32b862cd4..068b5fa89b 100644 --- a/pkg/util/labels/labels.go +++ b/pkg/util/labels/labels.go @@ -54,6 +54,19 @@ func CloneAndRemoveLabel(labels map[string]string, labelKey string) map[string]s return newLabels } +// AddLabel returns a map with the given key and value added to the given map. +func AddLabel(labels map[string]string, labelKey string, labelValue uint32) map[string]string { + if labelKey == "" { + // Dont need to add a label. + return labels + } + if labels == nil { + labels = make(map[string]string) + } + labels[labelKey] = fmt.Sprintf("%d", labelValue) + return labels +} + // Clones the given selector and returns a new selector with the given key and value added. // Returns the given selector, if labelKey is empty. func CloneSelectorAndAddLabel(selector *unversioned.LabelSelector, labelKey string, labelValue uint32) *unversioned.LabelSelector { @@ -93,3 +106,30 @@ func CloneSelectorAndAddLabel(selector *unversioned.LabelSelector, labelKey stri return newSelector } + +// AddLabelToSelector returns a selector with the given key and value added to the given selector's MatchLabels. +func AddLabelToSelector(selector *unversioned.LabelSelector, labelKey string, labelValue uint32) *unversioned.LabelSelector { + if labelKey == "" { + // Dont need to add a label. + return selector + } + if selector.MatchLabels == nil { + selector.MatchLabels = make(map[string]string) + } + selector.MatchLabels[labelKey] = fmt.Sprintf("%d", labelValue) + return selector +} + +// SelectorHasLabel checks if the given selector contains the given label key in its MatchLabels or MatchExpressions +func SelectorHasLabel(selector *unversioned.LabelSelector, labelKey string) bool { + _, found := selector.MatchLabels[labelKey] + if found { + return true + } + for _, exp := range selector.MatchExpressions { + if exp.Key == labelKey && exp.Operator != unversioned.LabelSelectorOpDoesNotExist { + return true + } + } + return false +}