diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 5d0d777b05..df37cada5f 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -26,6 +26,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -40,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/util/integer" labelsutil "k8s.io/kubernetes/pkg/util/labels" podutil "k8s.io/kubernetes/pkg/util/pod" + rsutil "k8s.io/kubernetes/pkg/util/replicaset" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" @@ -672,33 +674,31 @@ func lastRevision(allRSs []*extensions.ReplicaSet) int64 { // getOldReplicaSets returns two sets of old replica sets of the deployment. The first set of old replica sets doesn't include // the ones with no pods, and the second set of old replica sets include all old replica sets. +// Note that the pod-template-hash will be added to adopted RSes and pods. func (dc *DeploymentController) getOldReplicaSets(deployment *extensions.Deployment) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { - return deploymentutil.GetOldReplicaSetsFromLists(deployment, dc.client, - func(namespace string, options api.ListOptions) (*api.PodList, error) { - podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) - return &podList, err - }, - func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { - return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector) - }) + // List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods + rsList, podList, err := dc.rsAndPodsWithHashKeySynced(deployment) + if err != nil { + return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err) + } + return deploymentutil.FindOldReplicaSets(deployment, rsList, podList) } -// Returns a replica set that matches the intent of the given deployment. +// Returns a replica set that matches the intent of the given deployment. Returns nil if the new replica set doesn't exist yet. // 1. Get existing new RS (the RS that the given deployment targets, whose pod template is the same as deployment's). // 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes. // 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas. +// Note that the pod-template-hash will be added to adopted RSes and pods. func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) { // Calculate revision number for this new replica set newRevision := strconv.FormatInt(maxOldRevision+1, 10) - existingNewRS, err := deploymentutil.GetNewReplicaSetFromList(deployment, dc.client, - func(namespace string, options api.ListOptions) (*api.PodList, error) { - podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) - return &podList, err - }, - func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { - return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector) - }) + // List the deployment's RSes and apply pod-template-hash info to deployment's adopted RSes/Pods + rsList, _, err := dc.rsAndPodsWithHashKeySynced(deployment) + if err != nil { + return nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err) + } + existingNewRS, err := deploymentutil.FindNewReplicaSet(deployment, rsList) if err != nil { return nil, err } else if existingNewRS != nil { @@ -754,6 +754,130 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme return createdRS, dc.updateDeploymentRevision(deployment, newRevision) } +// rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment targets, with pod-template-hash information synced. +func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]extensions.ReplicaSet, *api.PodList, error) { + rsList, err := deploymentutil.ListReplicaSets(deployment, + func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { + return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector) + }) + if err != nil { + return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err) + } + syncedRSList := []extensions.ReplicaSet{} + for _, rs := range rsList { + // Add pod-template-hash information if it's not in the RS. + // Otherwise, new RS produced by Deployment will overlap with pre-existing ones + // that aren't constrained by the pod-template-hash. + syncedRS, err := dc.addHashKeyToRSAndPods(rs) + if err != nil { + return nil, nil, err + } + syncedRSList = append(syncedRSList, *syncedRS) + } + syncedPodList, err := deploymentutil.ListPods(deployment, + func(namespace string, options api.ListOptions) (*api.PodList, error) { + podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) + return &podList, err + }) + + if err != nil { + return nil, nil, err + } + return syncedRSList, syncedPodList, nil +} + +// addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps: +// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created +// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas +// 3. Add hash label to the rs's label and selector +func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) (updatedRS *extensions.ReplicaSet, err error) { + updatedRS = &rs + // If the rs already has the new hash label in its selector, it's done syncing + if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { + return + } + namespace := rs.Namespace + hash := rsutil.GetPodTemplateSpecHash(rs) + rsUpdated := false + // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. + updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS, + func(updated *extensions.ReplicaSet) error { + // Precondition: the RS doesn't contain the new hash in its pod template label. + if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash { + return utilerrors.ErrPreconditionViolated + } + updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + return nil + }) + if err != nil { + return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) + } + if !rsUpdated { + // If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error. + // Return here and retry in the next sync loop. + return &rs, nil + } + // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). + if updatedRS.Generation > updatedRS.Status.ObservedGeneration { + if err = deploymentutil.WaitForReplicaSetUpdated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil { + return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err) + } + } + glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) + + // 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. + selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err) + } + options := api.ListOptions{LabelSelector: selector} + podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) + if err != nil { + return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err) + } + allPodsLabeled := false + if allPodsLabeled, err = deploymentutil.LabelPodsWithHash(&podList, updatedRS, dc.client, namespace, hash); err != nil { + return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err) + } + // If not all pods are labeled but didn't return error in step 2, we've hit at least one pod not found error. + // Return here and retry in the next sync loop. + if !allPodsLabeled { + return updatedRS, nil + } + + // We need to wait for the replicaset controller to observe the pods being + // labeled with pod template hash. Because previously we've called + // WaitForReplicaSetUpdated, the replicaset controller should have dropped + // FullyLabeledReplicas to 0 already, we only need to wait it to increase + // back to the number of replicas in the spec. + if err = deploymentutil.WaitForPodsHashPopulated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil { + return nil, fmt.Errorf("%s %s/%s: error waiting for replicaset controller to observe pods being labeled with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) + } + + // 3. Update rs label and selector to include the new hash label + // Copy the old selector, so that we can scrub out any orphaned pods + if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS, + func(updated *extensions.ReplicaSet) error { + // Precondition: the RS doesn't contain the new hash in its label or selector. + if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash { + return utilerrors.ErrPreconditionViolated + } + updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) + return nil + }); err != nil { + return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) + } + if rsUpdated { + glog.V(4).Infof("Updated %s %s/%s's selector and label with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) + } + // If the RS isn't actually updated in step 3, that's okay, we'll retry in the next sync loop since its selector isn't updated yet. + + // TODO: look for orphaned pods and label them in the background somewhere else periodically + + return updatedRS, nil +} + // setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and // copying required deployment annotations to it; it returns true if replica set's annotation is changed. func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string) bool { diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 3b8d7004ae..6160f5d138 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -34,7 +34,6 @@ import ( intstrutil "k8s.io/kubernetes/pkg/util/intstr" labelsutil "k8s.io/kubernetes/pkg/util/labels" podutil "k8s.io/kubernetes/pkg/util/pod" - rsutil "k8s.io/kubernetes/pkg/util/replicaset" "k8s.io/kubernetes/pkg/util/wait" ) @@ -51,32 +50,87 @@ const ( // GetOldReplicaSets returns the old replica sets targeted by the given Deployment; get PodList and ReplicaSetList from client interface. // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets. func GetOldReplicaSets(deployment *extensions.Deployment, c clientset.Interface) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { - return GetOldReplicaSetsFromLists(deployment, c, - func(namespace string, options api.ListOptions) (*api.PodList, error) { - return c.Core().Pods(namespace).List(options) - }, + rsList, err := ListReplicaSets(deployment, func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { rsList, err := c.Extensions().ReplicaSets(namespace).List(options) return rsList.Items, err }) + if err != nil { + return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err) + } + podList, err := ListPods(deployment, + func(namespace string, options api.ListOptions) (*api.PodList, error) { + return c.Core().Pods(namespace).List(options) + }) + if err != nil { + return nil, nil, fmt.Errorf("error listing Pods: %v", err) + } + return FindOldReplicaSets(deployment, rsList, podList) +} + +// GetNewReplicaSet returns a replica set that matches the intent of the given deployment; get ReplicaSetList from client interface. +// Returns nil if the new replica set doesn't exist yet. +func GetNewReplicaSet(deployment *extensions.Deployment, c clientset.Interface) (*extensions.ReplicaSet, error) { + rsList, err := ListReplicaSets(deployment, + func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { + rsList, err := c.Extensions().ReplicaSets(namespace).List(options) + return rsList.Items, err + }) + if err != nil { + return nil, fmt.Errorf("error listing ReplicaSets: %v", err) + } + return FindNewReplicaSet(deployment, rsList) } // TODO: switch this to full namespacers type rsListFunc func(string, api.ListOptions) ([]extensions.ReplicaSet, error) type podListFunc func(string, api.ListOptions) (*api.PodList, error) -// GetOldReplicaSetsFromLists returns two sets of old replica sets targeted by the given Deployment; get PodList and ReplicaSetList with input functions. +// ListReplicaSets returns a slice of RSes the given deployment targets. +func ListReplicaSets(deployment *extensions.Deployment, getRSList rsListFunc) ([]extensions.ReplicaSet, error) { + // TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector + // should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830; + // or use controllerRef, see https://github.com/kubernetes/kubernetes/issues/2210 + namespace := deployment.Namespace + selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) + if err != nil { + return nil, err + } + options := api.ListOptions{LabelSelector: selector} + return getRSList(namespace, options) +} + +// ListPods returns a list of pods the given deployment targets. +func ListPods(deployment *extensions.Deployment, getPodList podListFunc) (*api.PodList, error) { + namespace := deployment.Namespace + selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) + if err != nil { + return nil, err + } + options := api.ListOptions{LabelSelector: selector} + return getPodList(namespace, options) +} + +// FindNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template). +func FindNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.ReplicaSet) (*extensions.ReplicaSet, error) { + newRSTemplate := GetNewReplicaSetTemplate(deployment) + for i := range rsList { + if api.Semantic.DeepEqual(rsList[i].Spec.Template, newRSTemplate) { + // This is the new ReplicaSet. + return &rsList[i], nil + } + } + // new ReplicaSet does not exist. + return nil, nil +} + +// FindOldReplicaSets returns the old replica sets targeted by the given Deployment, with the given PodList and slice of RSes. // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets. -func GetOldReplicaSetsFromLists(deployment *extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { +func FindOldReplicaSets(deployment *extensions.Deployment, rsList []extensions.ReplicaSet, podList *api.PodList) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { // Find all pods whose labels match deployment.Spec.Selector, and corresponding replica sets for pods in podList. // All pods and replica sets are labeled with pod-template-hash to prevent overlapping - // TODO: Right now we list all replica sets and then filter. We should add an API for this. oldRSs := map[string]extensions.ReplicaSet{} allOldRSs := map[string]extensions.ReplicaSet{} - rsList, podList, err := rsAndPodsWithHashKeySynced(deployment, c, getRSList, getPodList) - if err != nil { - return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err) - } newRSTemplate := GetNewReplicaSetTemplate(deployment) for _, pod := range podList.Items { podLabelsSelector := labels.Set(pod.ObjectMeta.Labels) @@ -108,168 +162,7 @@ func GetOldReplicaSetsFromLists(deployment *extensions.Deployment, c clientset.I return requiredRSs, allRSs, nil } -// GetNewReplicaSet returns a replica set that matches the intent of the given deployment; get ReplicaSetList from client interface. -// Returns nil if the new replica set doesn't exist yet. -func GetNewReplicaSet(deployment *extensions.Deployment, c clientset.Interface) (*extensions.ReplicaSet, error) { - return GetNewReplicaSetFromList(deployment, c, - func(namespace string, options api.ListOptions) (*api.PodList, error) { - return c.Core().Pods(namespace).List(options) - }, - func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { - rsList, err := c.Extensions().ReplicaSets(namespace).List(options) - return rsList.Items, err - }) -} - -// GetNewReplicaSetFromList returns a replica set that matches the intent of the given deployment; get ReplicaSetList with the input function. -// Returns nil if the new replica set doesn't exist yet. -func GetNewReplicaSetFromList(deployment *extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) (*extensions.ReplicaSet, error) { - rsList, _, err := rsAndPodsWithHashKeySynced(deployment, c, getRSList, getPodList) - if err != nil { - return nil, fmt.Errorf("error listing ReplicaSets: %v", err) - } - newRSTemplate := GetNewReplicaSetTemplate(deployment) - - for i := range rsList { - if api.Semantic.DeepEqual(rsList[i].Spec.Template, newRSTemplate) { - // This is the new ReplicaSet. - return &rsList[i], nil - } - } - // new ReplicaSet does not exist. - return nil, nil -} - -// rsAndPodsWithHashKeySynced returns the RSs and pods the given deployment targets, with pod-template-hash information synced. -func rsAndPodsWithHashKeySynced(deployment *extensions.Deployment, c clientset.Interface, getRSList rsListFunc, getPodList podListFunc) ([]extensions.ReplicaSet, *api.PodList, error) { - namespace := deployment.Namespace - selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) - if err != nil { - return nil, nil, err - } - options := api.ListOptions{LabelSelector: selector} - rsList, err := getRSList(namespace, options) - if err != nil { - return nil, nil, err - } - syncedRSList := []extensions.ReplicaSet{} - for _, rs := range rsList { - // Add pod-template-hash information if it's not in the RS. - // Otherwise, new RS produced by Deployment will overlap with pre-existing ones - // that aren't constrained by the pod-template-hash. - syncedRS, err := addHashKeyToRSAndPods(deployment, c, rs, getPodList) - if err != nil { - return nil, nil, err - } - syncedRSList = append(syncedRSList, *syncedRS) - } - syncedPodList, err := getPodList(namespace, options) - if err != nil { - return nil, nil, err - } - return syncedRSList, syncedPodList, nil -} - -// addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps: -// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created -// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas -// 3. Add hash label to the rs's label and selector -func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (updatedRS *extensions.ReplicaSet, err error) { - updatedRS = &rs - // If the rs already has the new hash label in its selector, it's done syncing - if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { - return - } - namespace := deployment.Namespace - meta := rs.Spec.Template.ObjectMeta - meta.Labels = labelsutil.CloneAndRemoveLabel(meta.Labels, extensions.DefaultDeploymentUniqueLabelKey) - hash := fmt.Sprintf("%d", podutil.GetPodTemplateSpecHash(api.PodTemplateSpec{ - ObjectMeta: meta, - Spec: rs.Spec.Template.Spec, - })) - rsUpdated := false - // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. - updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, - func(updated *extensions.ReplicaSet) error { - // Precondition: the RS doesn't contain the new hash in its pod template label. - if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash { - return errors.ErrPreconditionViolated - } - updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - return nil - }) - if err != nil { - return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) - } - if rsUpdated { - // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). - if updatedRS.Generation > updatedRS.Status.ObservedGeneration { - if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil { - return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err) - } - } - glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) - } else { - // If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error. - // Return here and retry in the next sync loop. - return &rs, nil - } - glog.V(4).Infof("Observed the update of rs %s's pod template with hash %s.", rs.Name, hash) - - // 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. - selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector) - if err != nil { - return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err) - } - options := api.ListOptions{LabelSelector: selector} - podList, err := getPodList(namespace, options) - if err != nil { - return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err) - } - allPodsLabeled := false - if allPodsLabeled, err = labelPodsWithHash(podList, updatedRS, c, namespace, hash); err != nil { - return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err) - } - // If not all pods are labeled but didn't return error in step 2, we've hit at least one pod not found error. - // Return here and retry in the next sync loop. - if !allPodsLabeled { - return updatedRS, nil - } - - // We need to wait for the replicaset controller to observe the pods being - // labeled with pod template hash. Because previously we've called - // waitForReplicaSetUpdated, the replicaset controller should have dropped - // FullyLabeledReplicas to 0 already, we only need to wait it to increase - // back to the number of replicas in the spec. - if err = waitForPodsHashPopulated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil { - return nil, fmt.Errorf("%s %s/%s: error waiting for replicaset controller to observe pods being labeled with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) - } - - // 3. Update rs label and selector to include the new hash label - // Copy the old selector, so that we can scrub out any orphaned pods - if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, - func(updated *extensions.ReplicaSet) error { - // Precondition: the RS doesn't contain the new hash in its label or selector. - if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash { - return errors.ErrPreconditionViolated - } - updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) - return nil - }); err != nil { - return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) - } - if rsUpdated { - glog.V(4).Infof("Updated %s %s/%s's selector and label with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) - } - // If the RS isn't actually updated in step 3, that's okay, we'll retry in the next sync loop since its selector isn't updated yet. - - // TODO: look for orphaned pods and label them in the background somewhere else periodically - - return updatedRS, nil -} - -func waitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, namespace, name string) error { +func WaitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, namespace, name string) error { return wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { rs, err := c.Extensions().ReplicaSets(namespace).Get(name) if err != nil { @@ -279,7 +172,7 @@ func waitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, na }) } -func waitForPodsHashPopulated(c clientset.Interface, desiredGeneration int64, namespace, name string) error { +func WaitForPodsHashPopulated(c clientset.Interface, desiredGeneration int64, namespace, name string) error { return wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) { rs, err := c.Extensions().ReplicaSets(namespace).Get(name) if err != nil { @@ -290,9 +183,9 @@ func waitForPodsHashPopulated(c clientset.Interface, desiredGeneration int64, na }) } -// labelPodsWithHash labels all pods in the given podList with the new hash label. +// LabelPodsWithHash labels all pods in the given podList with the new hash label. // The returned bool value can be used to tell if all pods are actually labeled. -func labelPodsWithHash(podList *api.PodList, rs *extensions.ReplicaSet, c clientset.Interface, namespace, hash string) (bool, error) { +func LabelPodsWithHash(podList *api.PodList, rs *extensions.ReplicaSet, c clientset.Interface, namespace, hash string) (bool, error) { allPodsLabeled := true for _, pod := range podList.Items { // Only label the pod that doesn't already have the new hash diff --git a/pkg/util/replicaset/replicaset.go b/pkg/util/replicaset/replicaset.go index 218298a515..4f7642fde3 100644 --- a/pkg/util/replicaset/replicaset.go +++ b/pkg/util/replicaset/replicaset.go @@ -21,10 +21,13 @@ import ( "time" "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/apis/extensions" unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" errorsutil "k8s.io/kubernetes/pkg/util/errors" + labelsutil "k8s.io/kubernetes/pkg/util/labels" + podutil "k8s.io/kubernetes/pkg/util/pod" "k8s.io/kubernetes/pkg/util/wait" ) @@ -78,3 +81,13 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs // if the error is nil and rsUpdated is true, the returned RS contains the applied update. return rs, rsUpdated, err } + +// GetPodTemplateSpecHash returns the pod template hash of a ReplicaSet's pod template space +func GetPodTemplateSpecHash(rs extensions.ReplicaSet) string { + meta := rs.Spec.Template.ObjectMeta + meta.Labels = labelsutil.CloneAndRemoveLabel(meta.Labels, extensions.DefaultDeploymentUniqueLabelKey) + return fmt.Sprintf("%d", podutil.GetPodTemplateSpecHash(api.PodTemplateSpec{ + ObjectMeta: meta, + Spec: rs.Spec.Template.Spec, + })) +}