extensions: add observedGeneration for deployments

pull/6/head
kargakis 2016-02-23 15:23:14 +01:00
parent 7f1b699880
commit 418d79cb78
7 changed files with 58 additions and 5 deletions

View File

@ -332,6 +332,9 @@ type RollingUpdateDeployment struct {
}
type DeploymentStatus struct {
// The generation observed by the deployment controller.
ObservedGeneration int `json:"observedGeneration,omitempty"`
// Total number of non-terminated pods targeted by this deployment (their labels match the selector).
Replicas int `json:"replicas,omitempty"`

View File

@ -328,6 +328,9 @@ type RollingUpdateDeployment struct {
// DeploymentStatus is the most recently observed status of the Deployment.
type DeploymentStatus struct {
// The generation observed by the deployment controller.
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
// Total number of non-terminated pods targeted by this deployment (their labels match the selector).
Replicas int32 `json:"replicas,omitempty"`

View File

@ -329,12 +329,29 @@ func ValidateDeploymentSpec(spec *extensions.DeploymentSpec, fldPath *field.Path
return allErrs
}
// Validates given deployment status.
func ValidateDeploymentStatus(status *extensions.DeploymentStatus, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.ObservedGeneration), fldPath.Child("observedGeneration"))...)
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.Replicas), fldPath.Child("replicas"))...)
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.UpdatedReplicas), fldPath.Child("updatedReplicas"))...)
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.AvailableReplicas), fldPath.Child("availableReplicas"))...)
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.UnavailableReplicas), fldPath.Child("unavailableReplicas"))...)
return allErrs
}
func ValidateDeploymentUpdate(update, old *extensions.Deployment) field.ErrorList {
allErrs := apivalidation.ValidateObjectMetaUpdate(&update.ObjectMeta, &old.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateDeploymentSpec(&update.Spec, field.NewPath("spec"))...)
return allErrs
}
func ValidateDeploymentStatusUpdate(update, old *extensions.Deployment) field.ErrorList {
allErrs := apivalidation.ValidateObjectMetaUpdate(&update.ObjectMeta, &old.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateDeploymentStatus(&update.Status, field.NewPath("status"))...)
return allErrs
}
func ValidateDeployment(obj *extensions.Deployment) field.ErrorList {
allErrs := apivalidation.ValidateObjectMeta(&obj.ObjectMeta, true, ValidateDeploymentName, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateDeploymentSpec(&obj.Spec, field.NewPath("spec"))...)

View File

@ -577,13 +577,13 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extension
}
// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment) error {
totalReplicas, updatedReplicas, availableReplicas, _, err := dc.calculateStatus(allRSs, newRS, deployment)
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d extensions.Deployment) error {
totalReplicas, updatedReplicas, availableReplicas, _, err := dc.calculateStatus(allRSs, newRS, d)
if err != nil {
return err
}
if deployment.Status.Replicas != totalReplicas || deployment.Status.UpdatedReplicas != updatedReplicas || deployment.Status.AvailableReplicas != availableReplicas {
return dc.updateDeploymentStatus(allRSs, newRS, deployment)
if d.Status.Replicas != totalReplicas || d.Status.UpdatedReplicas != updatedReplicas || d.Status.AvailableReplicas != availableReplicas || int(d.Generation) > d.Status.ObservedGeneration {
return dc.updateDeploymentStatus(allRSs, newRS, d)
}
return nil
}
@ -1036,6 +1036,8 @@ func (dc *DeploymentController) updateDeploymentStatus(allRSs []*extensions.Repl
newDeployment := deployment
// TODO: Reconcile this with API definition. API definition talks about ready pods, while this just computes created pods.
newDeployment.Status = extensions.DeploymentStatus{
// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
ObservedGeneration: int(deployment.Generation),
Replicas: totalReplicas,
UpdatedReplicas: updatedReplicas,
AvailableReplicas: availableReplicas,

View File

@ -18,6 +18,7 @@ package deployment
import (
"fmt"
"reflect"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
@ -48,6 +49,7 @@ func (deploymentStrategy) NamespaceScoped() bool {
func (deploymentStrategy) PrepareForCreate(obj runtime.Object) {
deployment := obj.(*extensions.Deployment)
deployment.Status = extensions.DeploymentStatus{}
deployment.Generation = 1
}
// Validate validates a new deployment.
@ -70,6 +72,14 @@ func (deploymentStrategy) PrepareForUpdate(obj, old runtime.Object) {
newDeployment := obj.(*extensions.Deployment)
oldDeployment := old.(*extensions.Deployment)
newDeployment.Status = oldDeployment.Status
// Spec updates bump the generation so that we can distinguish between
// scaling events and template changes, annotation updates bump the generation
// because annotations are copied from deployments to their replica sets.
if !reflect.DeepEqual(newDeployment.Spec, oldDeployment.Spec) ||
!reflect.DeepEqual(newDeployment.Annotations, oldDeployment.Annotations) {
newDeployment.Generation = oldDeployment.Generation + 1
}
}
// ValidateUpdate is the default update validation for an end user.
@ -97,7 +107,7 @@ func (deploymentStatusStrategy) PrepareForUpdate(obj, old runtime.Object) {
// ValidateUpdate is the default update validation for an end user updating status
func (deploymentStatusStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) field.ErrorList {
return validation.ValidateDeploymentUpdate(obj.(*extensions.Deployment), old.(*extensions.Deployment))
return validation.ValidateDeploymentStatusUpdate(obj.(*extensions.Deployment), old.(*extensions.Deployment))
}
// DeploymentToSelectableFields returns a field set that represents the object.

View File

@ -541,6 +541,10 @@ func testPausedDeployment(f *Framework) {
})
Expect(err).NotTo(HaveOccurred())
// Use observedGeneration to determine if the controller noticed the resume.
err = waitForObservedDeployment(c, ns, deploymentName)
Expect(err).NotTo(HaveOccurred())
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
Expect(err).NotTo(HaveOccurred())
@ -564,6 +568,10 @@ func testPausedDeployment(f *Framework) {
})
Expect(err).NotTo(HaveOccurred())
// Use observedGeneration to determine if the controller noticed the pause.
err = waitForObservedDeployment(c, ns, deploymentName)
Expect(err).NotTo(HaveOccurred())
newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(c.Extensions().ReplicaSets(ns).Delete(newRS.Name, nil)).NotTo(HaveOccurred())

View File

@ -2186,6 +2186,16 @@ func waitForDeploymentOldRSsNum(c *clientset.Clientset, ns, deploymentName strin
})
}
func waitForObservedDeployment(c *clientset.Clientset, ns, deploymentName string) error {
return wait.Poll(poll, 1*time.Minute, func() (bool, error) {
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
if err != nil {
return false, err
}
return int(deployment.Generation) == deployment.Status.ObservedGeneration, nil
})
}
func logReplicaSetsOfDeployment(deployment *extensions.Deployment, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) {
Logf("Deployment = %+v", deployment)
for i := range oldRSs {