Merge pull request #19581 from janetkuo/deployment-version

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2016-01-30 13:34:59 -08:00
commit 4ed43968e0
8 changed files with 172 additions and 41 deletions

View File

@ -170,11 +170,13 @@ type storeReplicationControllersNamespacer struct {
namespace string
}
func (s storeReplicationControllersNamespacer) List() (controllers []api.ReplicationController, err error) {
func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (controllers []api.ReplicationController, err error) {
for _, c := range s.store.List() {
rc := *(c.(*api.ReplicationController))
if s.namespace == api.NamespaceAll || s.namespace == rc.Namespace {
controllers = append(controllers, rc)
if selector.Matches(labels.Set(rc.Labels)) {
controllers = append(controllers, rc)
}
}
}
return

View File

@ -20,6 +20,7 @@ import (
"fmt"
"math"
"sort"
"strconv"
"time"
"github.com/golang/glog"
@ -31,7 +32,6 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
deploymentutil "k8s.io/kubernetes/pkg/util/deployment"
@ -427,16 +427,10 @@ func (dc *DeploymentController) syncDeployment(key string) error {
}
func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Deployment) error {
newRC, err := dc.getNewRC(deployment)
newRC, oldRCs, err := dc.getNewRCAndOldRCs(deployment)
if err != nil {
return err
}
oldRCs, err := dc.getOldRCs(deployment)
if err != nil {
return err
}
allRCs := append(oldRCs, newRC)
// scale down old rcs
@ -471,16 +465,10 @@ func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Dep
}
func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extensions.Deployment) error {
newRC, err := dc.getNewRC(deployment)
newRC, oldRCs, err := dc.getNewRCAndOldRCs(deployment)
if err != nil {
return err
}
oldRCs, err := dc.getOldRCs(deployment)
if err != nil {
return err
}
allRCs := append(oldRCs, newRC)
// Scale up, if we can.
@ -526,26 +514,85 @@ func (dc *DeploymentController) syncDeploymentStatus(allRCs []*api.ReplicationCo
return nil
}
func (dc *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, error) {
func (dc *DeploymentController) getNewRCAndOldRCs(deployment extensions.Deployment) (*api.ReplicationController, []*api.ReplicationController, error) {
oldRCs, allOldRCs, err := dc.getOldRCs(deployment)
if err != nil {
return nil, nil, err
}
maxOldV := maxRevision(allOldRCs)
// Get new RC with the updated revision number
newRC, err := dc.getNewRC(deployment, maxOldV)
if err != nil {
return nil, nil, err
}
// Sync deployment's revision number with new RC
if newRC.Annotations != nil && len(newRC.Annotations[deploymentutil.RevisionAnnotation]) > 0 &&
(deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != newRC.Annotations[deploymentutil.RevisionAnnotation]) {
if err = dc.updateDeploymentRevision(deployment, newRC.Annotations[deploymentutil.RevisionAnnotation]); err != nil {
glog.V(4).Infof("Error: %v. Unable to update deployment revision, will retry later.", err)
}
}
return newRC, oldRCs, nil
}
func revision(rc *api.ReplicationController) (int, error) {
v, ok := rc.Annotations[deploymentutil.RevisionAnnotation]
if !ok {
return 0, nil
}
return strconv.Atoi(v)
}
func maxRevision(allRCs []*api.ReplicationController) int {
max := 0
for _, rc := range allRCs {
if v, err := revision(rc); err != nil {
// Skip the RCs when it failed to parse their revision information
glog.V(4).Infof("Error: %v. Couldn't parse revision for rc %#v, deployment controller will skip it when reconciling revisions.", err, rc)
} else if v > max {
max = v
}
}
return max
}
func (dc *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, []*api.ReplicationController, error) {
return deploymentutil.GetOldRCsFromLists(deployment, dc.client,
func(namespace string, options api.ListOptions) (*api.PodList, error) {
podList, err := dc.podStore.Pods(namespace).List(labels.SelectorFromSet(deployment.Spec.Selector))
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
return &podList, err
},
func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) {
return dc.rcStore.ReplicationControllers(namespace).List()
return dc.rcStore.ReplicationControllers(namespace).List(options.LabelSelector)
})
}
// Returns an RC that matches the intent of the given deployment.
// It creates a new RC if required.
func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api.ReplicationController, error) {
// The revision of the new RC will be updated to maxOldRevision + 1
func (dc *DeploymentController) getNewRC(deployment extensions.Deployment, maxOldRevision int) (*api.ReplicationController, error) {
// Calculate revision number for this new RC
newRevision := strconv.Itoa(maxOldRevision + 1)
existingNewRC, err := deploymentutil.GetNewRCFromList(deployment, dc.client,
func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) {
return dc.rcStore.ReplicationControllers(namespace).List()
return dc.rcStore.ReplicationControllers(namespace).List(options.LabelSelector)
})
if err != nil || existingNewRC != nil {
return existingNewRC, err
if err != nil {
return nil, err
} else if existingNewRC != nil {
if existingNewRC.Annotations == nil {
existingNewRC.Annotations = make(map[string]string)
}
if existingNewRC.Annotations[deploymentutil.RevisionAnnotation] != newRevision {
existingNewRC.Annotations[deploymentutil.RevisionAnnotation] = newRevision
glog.V(4).Infof("update existingNewRC's revision to %s - %+v\n", newRevision, existingNewRC)
return dc.client.ReplicationControllers(deployment.ObjectMeta.Namespace).Update(existingNewRC)
}
return existingNewRC, nil
}
// Check the rc expectations of deployment before creating a new rc
dKey, err := controller.KeyFunc(&deployment)
@ -569,11 +616,13 @@ func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api
return nil, fmt.Errorf("couldn't get key for deployment controller %#v: %v", deployment, err)
}
dc.rcExpectations.ExpectCreations(dKey, 1)
// Create new RC
newRC := api.ReplicationController{
ObjectMeta: api.ObjectMeta{
GenerateName: deployment.Name + "-",
Namespace: namespace,
Annotations: map[string]string{deploymentutil.RevisionAnnotation: newRevision},
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
@ -586,9 +635,23 @@ func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api
dc.rcExpectations.DeleteExpectations(dKey)
return nil, fmt.Errorf("error creating replication controller: %v", err)
}
if err = dc.updateDeploymentRevision(deployment, newRevision); err != nil {
return createdRC, err
}
return createdRC, nil
}
func (dc *DeploymentController) updateDeploymentRevision(deployment extensions.Deployment, revision string) error {
if deployment.Annotations == nil {
deployment.Annotations = make(map[string]string)
}
deployment.Annotations[deploymentutil.RevisionAnnotation] = revision
_, err := dc.updateDeployment(&deployment)
return err
}
func (dc *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) {
if newRC.Spec.Replicas == deployment.Spec.Replicas {
// Scaling not required.

View File

@ -487,6 +487,7 @@ func TestSyncDeploymentCreatesRC(t *testing.T) {
opt := newListOptions()
f.expectCreateRCAction(rc)
f.expectUpdateDeploymentAction(d)
f.expectUpdateRCAction(updatedRC)
f.expectListPodAction(rc.Namespace, opt)
f.expectUpdateDeploymentAction(d)

View File

@ -1593,7 +1593,7 @@ func (dd *DeploymentDescriber) Describe(namespace, name string) (string, error)
ru := d.Spec.Strategy.RollingUpdate
fmt.Fprintf(out, "RollingUpdateStrategy:\t%s max unavailable, %s max surge, %d min ready seconds\n", ru.MaxUnavailable.String(), ru.MaxSurge.String(), ru.MinReadySeconds)
}
oldRCs, err := deploymentutil.GetOldRCs(*d, dd)
oldRCs, _, err := deploymentutil.GetOldRCs(*d, dd)
if err == nil {
fmt.Fprintf(out, "OldReplicationControllers:\t%s\n", printReplicationControllersByLabels(oldRCs))
}

View File

@ -28,8 +28,13 @@ import (
podutil "k8s.io/kubernetes/pkg/util/pod"
)
const (
// The revision annotation of a deployment's replication controllers which records its rollout sequence
RevisionAnnotation = "deployment.kubernetes.io/revision"
)
// GetOldRCs returns the old RCs targeted by the given Deployment; get PodList and RCList from client interface.
func GetOldRCs(deployment extensions.Deployment, c client.Interface) ([]*api.ReplicationController, error) {
func GetOldRCs(deployment extensions.Deployment, c client.Interface) ([]*api.ReplicationController, []*api.ReplicationController, error) {
return GetOldRCsFromLists(deployment, c,
func(namespace string, options api.ListOptions) (*api.PodList, error) {
return c.Pods(namespace).List(options)
@ -41,32 +46,34 @@ func GetOldRCs(deployment extensions.Deployment, c client.Interface) ([]*api.Rep
}
// GetOldRCsFromLists returns the old RCs targeted by the given Deployment; get PodList and RCList with input functions.
func GetOldRCsFromLists(deployment extensions.Deployment, c client.Interface, getPodList func(string, api.ListOptions) (*api.PodList, error), getRcList func(string, api.ListOptions) ([]api.ReplicationController, error)) ([]*api.ReplicationController, error) {
func GetOldRCsFromLists(deployment extensions.Deployment, c client.Interface, getPodList func(string, api.ListOptions) (*api.PodList, error), getRcList func(string, api.ListOptions) ([]api.ReplicationController, error)) ([]*api.ReplicationController, []*api.ReplicationController, error) {
namespace := deployment.ObjectMeta.Namespace
// 1. Find all pods whose labels match deployment.Spec.Selector
selector := labels.SelectorFromSet(deployment.Spec.Selector)
options := api.ListOptions{LabelSelector: selector}
podList, err := getPodList(namespace, options)
if err != nil {
return nil, fmt.Errorf("error listing pods: %v", err)
return nil, nil, fmt.Errorf("error listing pods: %v", err)
}
// 2. Find the corresponding RCs for pods in podList.
// TODO: Right now we list all RCs and then filter. We should add an API for this.
oldRCs := map[string]api.ReplicationController{}
rcList, err := getRcList(namespace, api.ListOptions{})
allOldRCs := map[string]api.ReplicationController{}
rcList, err := getRcList(namespace, options)
if err != nil {
return nil, fmt.Errorf("error listing replication controllers: %v", err)
return nil, nil, fmt.Errorf("error listing replication controllers: %v", err)
}
newRCTemplate := GetNewRCTemplate(deployment)
for _, pod := range podList.Items {
podLabelsSelector := labels.Set(pod.ObjectMeta.Labels)
for _, rc := range rcList {
rcLabelsSelector := labels.SelectorFromSet(rc.Spec.Selector)
// Filter out RC that has the same pod template spec as the deployment - that is the new RC.
if api.Semantic.DeepEqual(rc.Spec.Template, &newRCTemplate) {
continue
}
allOldRCs[rc.ObjectMeta.Name] = rc
if rcLabelsSelector.Matches(podLabelsSelector) {
// Filter out RC that has the same pod template spec as the deployment - that is the new RC.
if api.Semantic.DeepEqual(rc.Spec.Template, &newRCTemplate) {
continue
}
oldRCs[rc.ObjectMeta.Name] = rc
}
}
@ -76,7 +83,12 @@ func GetOldRCsFromLists(deployment extensions.Deployment, c client.Interface, ge
value := oldRCs[key]
requiredRCs = append(requiredRCs, &value)
}
return requiredRCs, nil
allRCs := []*api.ReplicationController{}
for key := range allOldRCs {
value := allOldRCs[key]
allRCs = append(allRCs, &value)
}
return requiredRCs, allRCs, nil
}
// GetNewRC returns an RC that matches the intent of the given deployment; get RCList from client interface.
@ -93,7 +105,7 @@ func GetNewRC(deployment extensions.Deployment, c client.Interface) (*api.Replic
// Returns nil if the new RC doesnt exist yet.
func GetNewRCFromList(deployment extensions.Deployment, c client.Interface, getRcList func(string, api.ListOptions) ([]api.ReplicationController, error)) (*api.ReplicationController, error) {
namespace := deployment.ObjectMeta.Namespace
rcList, err := getRcList(namespace, api.ListOptions{})
rcList, err := getRcList(namespace, api.ListOptions{LabelSelector: labels.SelectorFromSet(deployment.Spec.Selector)})
if err != nil {
return nil, fmt.Errorf("error listing replication controllers: %v", err)
}

View File

@ -314,7 +314,7 @@ func TestGetOldRCs(t *testing.T) {
}
for _, test := range tests {
rcs, err := GetOldRCs(newDeployment, testclient.NewSimpleFake(test.objs...))
rcs, _, err := GetOldRCs(newDeployment, testclient.NewSimpleFake(test.objs...))
if err != nil {
t.Errorf("In test case %s, got unexpected error %v", test.test, err)
}

View File

@ -145,6 +145,14 @@ func testNewDeployment(f *Framework) {
Expect(err).NotTo(HaveOccurred())
Expect(deployment.Status.Replicas).Should(Equal(replicas))
Expect(deployment.Status.UpdatedReplicas).Should(Equal(replicas))
// The new RC of this deployment should be revision 1
newRC, err := deploymentutil.GetNewRC(*deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(newRC.Annotations).NotTo(Equal(nil))
Expect(newRC.Annotations[deploymentutil.RevisionAnnotation]).Should(Equal("1"))
// This deployment should be revision 1
Expect(deployment.Annotations).NotTo(Equal(nil))
Expect(deployment.Annotations[deploymentutil.RevisionAnnotation]).Should(Equal("1"))
}
func testRollingUpdateDeployment(f *Framework) {
@ -175,7 +183,7 @@ func testRollingUpdateDeployment(f *Framework) {
// Create a deployment to delete nginx pods and instead bring up redis pods.
deploymentName := "redis-deployment"
Logf("Creating deployment %s", deploymentName)
_, err = c.Deployments(ns).Create(newDeployment(deploymentName, replicas, deploymentPodLabels, "redis", "redis", extensions.RollingUpdateDeploymentStrategyType, nil))
deployment, err := c.Deployments(ns).Create(newDeployment(deploymentName, replicas, deploymentPodLabels, "redis", "redis", extensions.RollingUpdateDeploymentStrategyType, nil))
Expect(err).NotTo(HaveOccurred())
defer func() {
deployment, err := c.Deployments(ns).Get(deploymentName)
@ -190,6 +198,17 @@ func testRollingUpdateDeployment(f *Framework) {
err = waitForDeploymentStatus(c, ns, deploymentName, replicas, replicas-1, replicas+1, 0)
Expect(err).NotTo(HaveOccurred())
// The new RC of this deployment should be revision 1
newRC, err := deploymentutil.GetNewRC(*deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(newRC.Annotations).NotTo(Equal(nil))
Expect(newRC.Annotations[deploymentutil.RevisionAnnotation]).Should(Equal("1"))
// This deployment should be revision 1
deployment, err = c.Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
Expect(deployment.Annotations).NotTo(Equal(nil))
Expect(deployment.Annotations[deploymentutil.RevisionAnnotation]).Should(Equal("1"))
}
func testRollingUpdateDeploymentEvents(f *Framework) {
@ -203,7 +222,14 @@ func testRollingUpdateDeploymentEvents(f *Framework) {
}
rcName := "nginx-controller"
replicas := 1
_, err := c.ReplicationControllers(ns).Create(newRC(rcName, replicas, rcPodLabels, "nginx", "nginx"))
rcRevision := "3"
annotations := make(map[string]string)
annotations[deploymentutil.RevisionAnnotation] = rcRevision
rc := newRC(rcName, replicas, rcPodLabels, "nginx", "nginx")
rc.Annotations = annotations
_, err := c.ReplicationControllers(ns).Create(rc)
Expect(err).NotTo(HaveOccurred())
defer func() {
Logf("deleting replication controller %s", rcName)
@ -250,6 +276,12 @@ func testRollingUpdateDeploymentEvents(f *Framework) {
Expect(newRC).NotTo(Equal(nil))
Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled up rc %s to 1", newRC.Name)))
Expect(events.Items[1].Message).Should(Equal(fmt.Sprintf("Scaled down rc %s to 0", rcName)))
// The new RC of this deployment should be revision 4
Expect(newRC.Annotations).NotTo(Equal(nil))
Expect(newRC.Annotations[deploymentutil.RevisionAnnotation]).Should(Equal("4"))
// This deployment should be revision 4
Expect(deployment.Annotations).NotTo(Equal(nil))
Expect(deployment.Annotations[deploymentutil.RevisionAnnotation]).Should(Equal("4"))
}
func testRecreateDeployment(f *Framework) {
@ -316,6 +348,12 @@ func testRecreateDeployment(f *Framework) {
Expect(newRC).NotTo(Equal(nil))
Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled down rc %s to 0", rcName)))
Expect(events.Items[1].Message).Should(Equal(fmt.Sprintf("Scaled up rc %s to 3", newRC.Name)))
// The new RC of this deployment should be revision 1
Expect(newRC.Annotations).NotTo(Equal(nil))
Expect(newRC.Annotations[deploymentutil.RevisionAnnotation]).Should(Equal("1"))
// This deployment should be revision 1
Expect(deployment.Annotations).NotTo(Equal(nil))
Expect(deployment.Annotations[deploymentutil.RevisionAnnotation]).Should(Equal("1"))
}
// testDeploymentCleanUpPolicy tests that deployment supports cleanup policy
@ -416,11 +454,20 @@ func testRolloverDeployment(f *Framework) {
}()
// Verify that the pods were scaled up and down as expected. We use events to verify that.
deployment, err := c.Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
// Make sure the deployment starts to scale up and down RCs
waitForPartialEvents(c, ns, deployment, 2)
newRC, err := deploymentutil.GetNewRC(*deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(newRC).NotTo(Equal(nil))
// The new RC of this deployment should be revision 1
Expect(newRC.Annotations).NotTo(Equal(nil))
Expect(newRC.Annotations[deploymentutil.RevisionAnnotation]).Should(Equal("1"))
// This deployment should be revision 1
deployment, err = c.Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
Expect(deployment.Annotations).NotTo(Equal(nil))
Expect(deployment.Annotations[deploymentutil.RevisionAnnotation]).Should(Equal("1"))
// Before the deployment finishes, update the deployment to rollover the above 2 rcs and bring up redis pods.
// If the deployment already finished here, the test would fail. When this happens, increase its minReadySeconds or replicas to prevent it.
@ -442,6 +489,12 @@ func testRolloverDeployment(f *Framework) {
// Make sure new RC contains "redis" image
newRC, err = deploymentutil.GetNewRC(*deployment, c)
Expect(newRC.Spec.Template.Spec.Containers[0].Image).Should(Equal(updatedDeploymentImage))
// The new RC of this deployment should be revision 2
Expect(newRC.Annotations).NotTo(Equal(nil))
Expect(newRC.Annotations[deploymentutil.RevisionAnnotation]).Should(Equal("2"))
// This deployment should be revision 2
Expect(deployment.Annotations).NotTo(Equal(nil))
Expect(deployment.Annotations[deploymentutil.RevisionAnnotation]).Should(Equal("2"))
}
func testPausedDeployment(f *Framework) {

View File

@ -1972,7 +1972,7 @@ func waitForDeploymentStatus(c *client.Client, ns, deploymentName string, desire
if err != nil {
return false, err
}
oldRCs, err := deploymentutil.GetOldRCs(*deployment, c)
oldRCs, _, err := deploymentutil.GetOldRCs(*deployment, c)
if err != nil {
return false, err
}
@ -2024,7 +2024,7 @@ func waitForDeploymentOldRCsNum(c *client.Client, ns, deploymentName string, des
if err != nil {
return false, err
}
oldRCs, err := deploymentutil.GetOldRCs(*deployment, c)
oldRCs, _, err := deploymentutil.GetOldRCs(*deployment, c)
if err != nil {
return false, err
}