Address comments; fix test failures; add e2e tests; update RS's label too

pull/6/head
Janet Kuo 2016-02-11 10:57:42 -08:00
parent da58172283
commit 11fdbff97f
4 changed files with 171 additions and 63 deletions

View File

@ -63,25 +63,14 @@ type podListFunc func(string, api.ListOptions) (*api.PodList, error)
// GetOldReplicaSetsFromLists returns two sets of old replica sets targeted by the given Deployment; get PodList and ReplicaSetList with input functions.
// Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
namespace := deployment.ObjectMeta.Namespace
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
return nil, nil, fmt.Errorf("invalid label selector: %v", err)
}
// 1. Find all pods whose labels match deployment.Spec.Selector
options := api.ListOptions{LabelSelector: selector}
podList, err := getPodList(namespace, options)
if err != nil {
return nil, nil, fmt.Errorf("error listing pods: %v", err)
}
// 2. Find the corresponding replica sets for pods in podList.
// Find all pods whose labels match deployment.Spec.Selector, and corresponding replica sets for pods in podList.
// All pods and replica sets are labeled with pod-template-hash to prevent overlapping
// TODO: Right now we list all replica sets and then filter. We should add an API for this.
oldRSs := map[string]extensions.ReplicaSet{}
allOldRSs := map[string]extensions.ReplicaSet{}
rsList, err := rsListWithHashKeySynced(deployment, c, getRSList, getPodList)
rsList, podList, err := rsAndPodsWithHashKeySynced(deployment, c, getRSList, getPodList)
if err != nil {
return nil, nil, fmt.Errorf("error listing replica sets: %v", err)
return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err)
}
newRSTemplate := GetNewReplicaSetTemplate(deployment)
for _, pod := range podList.Items {
@ -130,7 +119,7 @@ func GetNewReplicaSet(deployment extensions.Deployment, c clientset.Interface) (
// GetNewReplicaSetFromList returns a replica set that matches the intent of the given deployment; get ReplicaSetList with the input function.
// Returns nil if the new replica set doesn't exist yet.
func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) (*extensions.ReplicaSet, error) {
rsList, err := rsListWithHashKeySynced(deployment, c, getRSList, getPodList)
rsList, _, err := rsAndPodsWithHashKeySynced(deployment, c, getRSList, getPodList)
if err != nil {
return nil, fmt.Errorf("error listing ReplicaSets: %v", err)
}
@ -146,54 +135,45 @@ func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Inte
return nil, nil
}
// rsListWithHashKeySynced returns a list of rs the deployment targets, with pod-template-hash information synced.
func rsListWithHashKeySynced(deployment extensions.Deployment, c clientset.Interface, getRSList rsListFunc, getPodList podListFunc) ([]extensions.ReplicaSet, error) {
// rsAndPodsWithHashKeySynced returns a list of rs the deployment targets, with pod-template-hash information synced.
func rsAndPodsWithHashKeySynced(deployment extensions.Deployment, c clientset.Interface, getRSList rsListFunc, getPodList podListFunc) ([]extensions.ReplicaSet, *api.PodList, error) {
namespace := deployment.Namespace
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
return nil, err
return nil, nil, err
}
options := api.ListOptions{LabelSelector: selector}
rsList, err := getRSList(namespace, options)
if err != nil {
return nil, err
return nil, nil, err
}
syncedRSList := []extensions.ReplicaSet{}
for _, rs := range rsList {
// Add pod-template-hash information if it's not in the rs
if !labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
updatedRS, err := addHashKeyToReplicaSet(deployment, c, rs, getPodList)
if err != nil {
return nil, err
}
syncedRSList = append(syncedRSList, *updatedRS)
// Add pod-template-hash information if it's not in the RS.
// Otherwise, new RS produced by Deployment will overlap we pre-existing ones
// that aren't constrained by the pod-template-hash.
syncedRS, err := addHashKeyToRSAndPods(deployment, c, rs, getPodList)
if err != nil {
return nil, nil, err
}
syncedRSList = append(syncedRSList, rs)
syncedRSList = append(syncedRSList, *syncedRS)
}
return syncedRSList, nil
syncedPodList, err := getPodList(namespace, options)
return syncedRSList, syncedPodList, nil
}
// addHashKeyToReplicaSet adds pod-template-hash information to the given rs with the following steps:
// 1. Add hash label to the rs's pod template
// 2. Add hash label to all pods this rs owns
// 3. Add hash label to the rs's selector
// 4. Clean up all pods this rs owns but without the hash label (orphaned pods)
func addHashKeyToReplicaSet(deployment extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (*extensions.ReplicaSet, error) {
// addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps:
// 1. Add hash label to all pods this rs owns
// 2. Add hash label to the rs's pod template, the rs's label, and the rs's selector
// 3. Clean up all pods this rs owns but without the hash label (orphaned pods)
func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (*extensions.ReplicaSet, error) {
if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
return &rs, nil
}
namespace := deployment.Namespace
hash := podutil.GetPodTemplateSpecHash(*rs.Spec.Template)
// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
updatedRS, err := updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) {
updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
})
if err != nil {
return nil, err
}
// 2. Update all pods managed by the rs to have the new hash label, so they are correctly adopted.
selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector)
hash := fmt.Sprintf("%d", podutil.GetPodTemplateSpecHash(*rs.Spec.Template))
// 1. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
return nil, err
}
@ -207,29 +187,31 @@ func addHashKeyToReplicaSet(deployment extensions.Deployment, c clientset.Interf
delay, maxRetries := 3, 3
for i := 0; i < maxRetries; i++ {
_, err = c.Core().Pods(namespace).Update(&pod)
if err != nil {
time.Sleep(time.Second * time.Duration(delay))
delay *= delay
} else {
if err == nil {
break
}
time.Sleep(time.Second * time.Duration(delay))
delay *= delay
}
if err != nil {
return nil, err
}
}
// 3. Update rs selector
// 2. Update rs label, rs template label, and rs selector to include the new hash label
// Copy the old selector, so that we can scrub out any orphaned pods
oldSelector := updatedRS.Spec.Selector
oldSelector := rs.Spec.Selector
// Update the selector of the rs so it manages all the pods we updated above
if updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) {
updatedRS, err := updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) {
updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash)
}); err != nil {
})
if err != nil {
return nil, err
}
// 4. Clean up any orphaned pods that don't have the new label, this can happen if the rs manager
// 3. Clean up any orphaned pods that don't have the new label, this can happen if the rs manager
// doesn't see the update to its pod template and creates a new pod with the old labels after
// we've finished re-adopting existing pods to the rs.
selector, err = unversioned.LabelSelectorAsSelector(oldSelector)
@ -238,9 +220,8 @@ func addHashKeyToReplicaSet(deployment extensions.Deployment, c clientset.Interf
}
options = api.ListOptions{LabelSelector: selector}
podList, err = getPodList(namespace, options)
hashString := fmt.Sprintf("%d", hash)
for _, pod := range podList.Items {
if value, found := pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]; !found || value != hashString {
if value, found := pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]; !found || value != hash {
if err := c.Core().Pods(namespace).Delete(pod.Name, nil); err != nil {
return nil, err
}

View File

@ -26,10 +26,43 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/testing/fake"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/client/unversioned/testclient/simple"
"k8s.io/kubernetes/pkg/runtime"
)
func addListRSReactor(fakeClient *fake.Clientset, obj runtime.Object) *fake.Clientset {
fakeClient.AddReactor("list", "replicasets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, obj, nil
})
return fakeClient
}
func addListPodsReactor(fakeClient *fake.Clientset, obj runtime.Object) *fake.Clientset {
fakeClient.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, obj, nil
})
return fakeClient
}
func addUpdateRSReactor(fakeClient *fake.Clientset) *fake.Clientset {
fakeClient.AddReactor("update", "replicasets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := action.(testclient.UpdateAction).GetObject().(*extensions.ReplicaSet)
return true, obj, nil
})
return fakeClient
}
func addUpdatePodsReactor(fakeClient *fake.Clientset) *fake.Clientset {
fakeClient.AddReactor("update", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := action.(testclient.UpdateAction).GetObject().(*api.Pod)
return true, obj, nil
})
return fakeClient
}
func newPod(now time.Time, ready bool, beforeSec int) api.Pod {
conditionStatus := api.ConditionFalse
if ready {
@ -313,12 +346,24 @@ func TestGetOldRCs(t *testing.T) {
}
for _, test := range tests {
rss, _, err := GetOldReplicaSets(newDeployment, fake.NewSimpleClientset(test.objs...))
fakeClient := &fake.Clientset{}
fakeClient = addListPodsReactor(fakeClient, test.objs[0])
fakeClient = addListRSReactor(fakeClient, test.objs[1])
fakeClient = addUpdatePodsReactor(fakeClient)
fakeClient = addUpdateRSReactor(fakeClient)
rss, _, err := GetOldReplicaSets(newDeployment, fakeClient)
if err != nil {
t.Errorf("In test case %s, got unexpected error %v", test.test, err)
}
if !equal(rss, test.expected) {
t.Errorf("In test case %q, expected %v, got %v", test.test, test.expected, rss)
t.Errorf("In test case %q, expected:", test.test)
for _, rs := range test.expected {
t.Errorf("rs = %+v", rs)
}
t.Errorf("In test case %q, got:", test.test)
for _, rs := range rss {
t.Errorf("rs = %+v", rs)
}
}
}
}

View File

@ -55,7 +55,7 @@ func CloneAndRemoveLabel(labels map[string]string, labelKey string) map[string]s
}
// AddLabel returns a map with the given key and value added to the given map.
func AddLabel(labels map[string]string, labelKey string, labelValue uint32) map[string]string {
func AddLabel(labels map[string]string, labelKey string, labelValue string) map[string]string {
if labelKey == "" {
// Dont need to add a label.
return labels
@ -63,7 +63,7 @@ func AddLabel(labels map[string]string, labelKey string, labelValue uint32) map[
if labels == nil {
labels = make(map[string]string)
}
labels[labelKey] = fmt.Sprintf("%d", labelValue)
labels[labelKey] = labelValue
return labels
}
@ -108,7 +108,7 @@ func CloneSelectorAndAddLabel(selector *unversioned.LabelSelector, labelKey stri
}
// AddLabelToSelector returns a selector with the given key and value added to the given selector's MatchLabels.
func AddLabelToSelector(selector *unversioned.LabelSelector, labelKey string, labelValue uint32) *unversioned.LabelSelector {
func AddLabelToSelector(selector *unversioned.LabelSelector, labelKey string, labelValue string) *unversioned.LabelSelector {
if labelKey == "" {
// Dont need to add a label.
return selector
@ -116,7 +116,7 @@ func AddLabelToSelector(selector *unversioned.LabelSelector, labelKey string, la
if selector.MatchLabels == nil {
selector.MatchLabels = make(map[string]string)
}
selector.MatchLabels[labelKey] = fmt.Sprintf("%d", labelValue)
selector.MatchLabels[labelKey] = labelValue
return selector
}

View File

@ -66,6 +66,9 @@ var _ = Describe("Deployment", func() {
It("[Flaky] deployment should support rollback when there's replica set with no revision", func() {
testRollbackDeploymentRSNoRevision(f)
})
It("deployment should label adopted RSs and pods", func() {
testDeploymentLabelAdopted(f)
})
})
func newRS(rsName string, replicas int, rsPodLabels map[string]string, imageName string, image string) *extensions.ReplicaSet {
@ -272,6 +275,17 @@ func testRollingUpdateDeployment(f *Framework) {
// Check if it's updated to revision 1 correctly
checkDeploymentRevision(c, ns, deploymentName, "1", "redis", "redis")
// There should be 1 old RS (nginx-controller, which is adopted)
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
_, allOldRSs, err := deploymentutil.GetOldReplicaSets(*deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(len(allOldRSs)).Should(Equal(1))
// The old RS should contain pod-template-hash in its selector, label, and template label
Expect(len(allOldRSs[0].Labels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0))
Expect(len(allOldRSs[0].Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0))
Expect(len(allOldRSs[0].Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0))
}
func testRollingUpdateDeploymentEvents(f *Framework) {
@ -797,3 +811,71 @@ func testRollbackDeploymentRSNoRevision(f *Framework) {
// Check if it's still revision 3
checkDeploymentRevision(c, ns, deploymentName, "3", deploymentImageName, deploymentImage)
}
func testDeploymentLabelAdopted(f *Framework) {
ns := f.Namespace.Name
// TODO: remove unversionedClient when the refactoring is done. Currently some
// functions like verifyPod still expects a unversioned#Client.
unversionedClient := f.Client
c := clientset.FromUnversionedClient(unversionedClient)
// Create nginx pods.
podName := "nginx"
podLabels := map[string]string{"name": podName}
rsName := "nginx-controller"
replicas := 3
_, err := c.Extensions().ReplicaSets(ns).Create(newRS(rsName, replicas, podLabels, podName, podName))
Expect(err).NotTo(HaveOccurred())
// Verify that the required pods have come up.
err = verifyPods(unversionedClient, ns, podName, false, 3)
if err != nil {
Logf("error in waiting for pods to come up: %s", err)
Expect(err).NotTo(HaveOccurred())
}
// Create a nginx deployment to adopt the old rs.
deploymentName := "nginx-deployment"
Logf("Creating deployment %s", deploymentName)
_, err = c.Extensions().Deployments(ns).Create(newDeployment(deploymentName, replicas, podLabels, podName, podName, extensions.RollingUpdateDeploymentStrategyType, nil))
Expect(err).NotTo(HaveOccurred())
defer func() {
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
Logf("deleting deployment %s", deploymentName)
Expect(c.Extensions().Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred())
// TODO: remove this once we can delete replica sets with deployment
newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(c.Extensions().ReplicaSets(ns).Delete(newRS.Name, nil)).NotTo(HaveOccurred())
}()
err = waitForDeploymentStatus(c, ns, deploymentName, replicas, replicas-1, replicas+1, 0)
Expect(err).NotTo(HaveOccurred())
// Check if it's updated to revision 1 correctly
checkDeploymentRevision(c, ns, deploymentName, "1", "nginx", "nginx")
// There should be no old RSs (overlapping RS)
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
oldRSs, allOldRSs, err := deploymentutil.GetOldReplicaSets(*deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(len(oldRSs)).Should(Equal(0))
Expect(len(allOldRSs)).Should(Equal(0))
// New RS should contain pod-template-hash in its selector, label, and template label
newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(len(newRS.Labels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0))
Expect(len(newRS.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0))
Expect(len(newRS.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0))
// All pods targeted by the deployment should contain pod-template-hash in their labels, and there should be only 3 pods
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
Expect(err).NotTo(HaveOccurred())
options := api.ListOptions{LabelSelector: selector}
pods, err := c.Core().Pods(ns).List(options)
Expect(err).NotTo(HaveOccurred())
for _, pod := range pods.Items {
Expect(len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0))
}
Expect(len(pods.Items)).Should(Equal(replicas))
}