mirror of https://github.com/k3s-io/k3s
Merge pull request #21857 from nikhiljindal/stopDeployment
fix deployment e2e flake: Update DeploymentReaper.Stop to use ObservedGenerationpull/6/head
commit
0b5edab208
|
@ -420,9 +420,12 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
||||||
d := *obj.(*extensions.Deployment)
|
d := *obj.(*extensions.Deployment)
|
||||||
|
|
||||||
if d.Spec.Paused {
|
if d.Spec.Paused {
|
||||||
|
// TODO: Implement scaling for paused deployments.
|
||||||
|
// Dont take any action for paused deployment.
|
||||||
|
// But keep the status up-to-date.
|
||||||
// Ignore paused deployments
|
// Ignore paused deployments
|
||||||
glog.V(4).Infof("Ignoring paused deployment %s/%s", d.Namespace, d.Name)
|
glog.V(4).Infof("Updating status only for paused deployment %s/%s", d.Namespace, d.Name)
|
||||||
return nil
|
return dc.syncPausedDeploymentStatus(&d)
|
||||||
}
|
}
|
||||||
if d.Spec.RollbackTo != nil {
|
if d.Spec.RollbackTo != nil {
|
||||||
revision := d.Spec.RollbackTo.Revision
|
revision := d.Spec.RollbackTo.Revision
|
||||||
|
@ -440,6 +443,18 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
||||||
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
|
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Updates the status of a paused deployment
|
||||||
|
func (dc *DeploymentController) syncPausedDeploymentStatus(deployment *extensions.Deployment) error {
|
||||||
|
newRS, oldRSs, err := dc.getAllReplicaSets(*deployment, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
allRSs := append(controller.FilterActiveReplicaSets(oldRSs), newRS)
|
||||||
|
|
||||||
|
// Sync deployment status
|
||||||
|
return dc.syncDeploymentStatus(allRSs, newRS, *deployment)
|
||||||
|
}
|
||||||
|
|
||||||
// Rolling back to a revision; no-op if the toRevision is deployment's current revision
|
// Rolling back to a revision; no-op if the toRevision is deployment's current revision
|
||||||
func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) {
|
func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) {
|
||||||
newRS, allOldRSs, err := dc.getAllReplicaSets(*deployment, true)
|
newRS, allOldRSs, err := dc.getAllReplicaSets(*deployment, true)
|
||||||
|
|
|
@ -30,6 +30,7 @@ import (
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/util"
|
"k8s.io/kubernetes/pkg/util"
|
||||||
|
deploymentutil "k8s.io/kubernetes/pkg/util/deployment"
|
||||||
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
)
|
)
|
||||||
|
@ -367,52 +368,25 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati
|
||||||
// TODO replace with patch when available: https://github.com/kubernetes/kubernetes/issues/20527
|
// TODO replace with patch when available: https://github.com/kubernetes/kubernetes/issues/20527
|
||||||
d.Spec.RevisionHistoryLimit = util.IntPtr(0)
|
d.Spec.RevisionHistoryLimit = util.IntPtr(0)
|
||||||
d.Spec.Replicas = 0
|
d.Spec.Replicas = 0
|
||||||
// TODO: un-pausing should not be necessary, remove when this is fixed:
|
|
||||||
// https://github.com/kubernetes/kubernetes/issues/20966
|
|
||||||
// Instead deployment should be Paused at this point and not at next TODO.
|
|
||||||
d.Spec.Paused = false
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for total no of pods drop to 0
|
|
||||||
if err := wait.Poll(reaper.pollInterval, reaper.timeout, func() (bool, error) {
|
|
||||||
curr, err := deployments.Get(name)
|
|
||||||
// if deployment was not found it must have been deleted, error out
|
|
||||||
if err != nil && errors.IsNotFound(err) {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
// if other errors happen, retry
|
|
||||||
if err != nil {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
// check if deployment wasn't recreated with the same name
|
|
||||||
// TODO use generations when deployment will have them
|
|
||||||
if curr.UID != deployment.UID {
|
|
||||||
return false, errors.NewNotFound(extensions.Resource("Deployment"), name)
|
|
||||||
}
|
|
||||||
return curr.Status.Replicas == 0, nil
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: When deployments will allow running cleanup policy while being
|
|
||||||
// paused, move pausing to above update operation. Without it, we need to
|
|
||||||
// pause deployment before stopping RSs, to prevent creating new RSs.
|
|
||||||
// See https://github.com/kubernetes/kubernetes/issues/20966
|
|
||||||
deployment, err = reaper.updateDeploymentWithRetries(namespace, name, func(d *extensions.Deployment) {
|
|
||||||
d.Spec.Paused = true
|
d.Spec.Paused = true
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove remaining RSs
|
// Use observedGeneration to determine if the deployment controller noticed the pause.
|
||||||
|
if err := deploymentutil.WaitForObservedDeployment(func() (*extensions.Deployment, error) {
|
||||||
|
return deployments.Get(name)
|
||||||
|
}, deployment.Generation, 10*time.Millisecond, 1*time.Minute); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop all replica sets.
|
||||||
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
|
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
options := api.ListOptions{LabelSelector: selector}
|
options := api.ListOptions{LabelSelector: selector}
|
||||||
rsList, err := replicaSets.List(options)
|
rsList, err := replicaSets.List(options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -430,7 +404,8 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati
|
||||||
return utilerrors.NewAggregate(errList)
|
return utilerrors.NewAggregate(errList)
|
||||||
}
|
}
|
||||||
|
|
||||||
// and finally deployment
|
// Delete deployment at the end.
|
||||||
|
// Note: We delete deployment at the end so that if removing RSs fails, we atleast have the deployment to retry.
|
||||||
return deployments.Delete(name, gracePeriod)
|
return deployments.Delete(name, gracePeriod)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -538,8 +538,7 @@ func TestDeploymentStop(t *testing.T) {
|
||||||
},
|
},
|
||||||
StopError: nil,
|
StopError: nil,
|
||||||
ExpectedActions: []string{"get:deployments", "update:deployments",
|
ExpectedActions: []string{"get:deployments", "update:deployments",
|
||||||
"get:deployments", "get:deployments", "update:deployments",
|
"get:deployments", "list:replicasets", "delete:deployments"},
|
||||||
"list:replicasets", "delete:deployments"},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "Deployment with single replicaset",
|
Name: "Deployment with single replicaset",
|
||||||
|
@ -561,10 +560,9 @@ func TestDeploymentStop(t *testing.T) {
|
||||||
},
|
},
|
||||||
StopError: nil,
|
StopError: nil,
|
||||||
ExpectedActions: []string{"get:deployments", "update:deployments",
|
ExpectedActions: []string{"get:deployments", "update:deployments",
|
||||||
"get:deployments", "get:deployments", "update:deployments",
|
"get:deployments", "list:replicasets", "get:replicasets",
|
||||||
"list:replicasets", "get:replicasets", "get:replicasets",
|
"get:replicasets", "update:replicasets", "get:replicasets",
|
||||||
"update:replicasets", "get:replicasets", "get:replicasets",
|
"get:replicasets", "delete:replicasets", "delete:deployments"},
|
||||||
"delete:replicasets", "delete:deployments"},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -436,3 +436,16 @@ func NewRSNewReplicas(deployment *extensions.Deployment, allRSs []*extensions.Re
|
||||||
return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
|
return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration.
|
||||||
|
// Returns error if polling timesout.
|
||||||
|
func WaitForObservedDeployment(getDeploymentFunc func() (*extensions.Deployment, error), desiredGeneration int64, interval, timeout time.Duration) error {
|
||||||
|
// TODO: This should take clientset.Interface when all code is updated to use clientset. Keeping it this way allows the function to be used by callers who have client.Interface.
|
||||||
|
return wait.Poll(interval, timeout, func() (bool, error) {
|
||||||
|
deployment, err := getDeploymentFunc()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return deployment.Status.ObservedGeneration >= desiredGeneration, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -543,7 +543,7 @@ func testPausedDeployment(f *Framework) {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
// Use observedGeneration to determine if the controller noticed the resume.
|
// Use observedGeneration to determine if the controller noticed the resume.
|
||||||
err = waitForObservedDeployment(c, ns, deploymentName)
|
err = waitForObservedDeployment(c, ns, deploymentName, deployment.Generation)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
|
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
|
||||||
|
@ -570,7 +570,7 @@ func testPausedDeployment(f *Framework) {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
// Use observedGeneration to determine if the controller noticed the pause.
|
// Use observedGeneration to determine if the controller noticed the pause.
|
||||||
err = waitForObservedDeployment(c, ns, deploymentName)
|
err = waitForObservedDeployment(c, ns, deploymentName, deployment.Generation)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)
|
newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)
|
||||||
|
|
|
@ -2224,16 +2224,6 @@ func waitForDeploymentOldRSsNum(c *clientset.Clientset, ns, deploymentName strin
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForObservedDeployment(c *clientset.Clientset, ns, deploymentName string) error {
|
|
||||||
return wait.Poll(poll, 1*time.Minute, func() (bool, error) {
|
|
||||||
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
return deployment.Generation == deployment.Status.ObservedGeneration, nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func logReplicaSetsOfDeployment(deployment *extensions.Deployment, allOldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) {
|
func logReplicaSetsOfDeployment(deployment *extensions.Deployment, allOldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) {
|
||||||
Logf("Deployment = %+v", deployment)
|
Logf("Deployment = %+v", deployment)
|
||||||
for i := range allOldRSs {
|
for i := range allOldRSs {
|
||||||
|
@ -2242,6 +2232,10 @@ func logReplicaSetsOfDeployment(deployment *extensions.Deployment, allOldRSs []*
|
||||||
Logf("New ReplicaSet of deployment %s: %+v", deployment.Name, newRS)
|
Logf("New ReplicaSet of deployment %s: %+v", deployment.Name, newRS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func waitForObservedDeployment(c *clientset.Clientset, ns, deploymentName string, desiredGeneration int64) error {
|
||||||
|
return deploymentutil.WaitForObservedDeployment(func() (*extensions.Deployment, error) { return c.Extensions().Deployments(ns).Get(deploymentName) }, desiredGeneration, poll, 1*time.Minute)
|
||||||
|
}
|
||||||
|
|
||||||
func logPodsOfReplicaSets(c clientset.Interface, rss []*extensions.ReplicaSet, minReadySeconds int) {
|
func logPodsOfReplicaSets(c clientset.Interface, rss []*extensions.ReplicaSet, minReadySeconds int) {
|
||||||
allPods, err := deploymentutil.GetPodsForReplicaSets(c, rss)
|
allPods, err := deploymentutil.GetPodsForReplicaSets(c, rss)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
Loading…
Reference in New Issue