mirror of https://github.com/k3s-io/k3s
Merge pull request #52896 from janetkuo/deploy-hash-inte
Automatic merge from submit-queue (batch tested with PRs 52721, 53057, 52493, 52998, 52896). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Move deployment collision avoidance e2e test to integration **What this PR does / why we need it**: **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: ref #52113 **Special notes for your reviewer**: **Release note**: ```release-note NONE ```pull/6/head
commit
52f96dae45
|
@ -108,9 +108,6 @@ var _ = SIGDescribe("Deployment", func() {
|
|||
It("test Deployment ReplicaSet orphaning and adoption regarding controllerRef", func() {
|
||||
testDeploymentsControllerRef(f)
|
||||
})
|
||||
It("deployment can avoid hash collisions", func() {
|
||||
testDeploymentHashCollisionAvoidance(f)
|
||||
})
|
||||
// TODO: add tests that cover deployment.Spec.MinReadySeconds once we solved clock-skew issues
|
||||
// See https://github.com/kubernetes/kubernetes/issues/29229
|
||||
})
|
||||
|
@ -1270,47 +1267,3 @@ func orphanDeploymentReplicaSets(c clientset.Interface, d *extensions.Deployment
|
|||
deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(d.UID))
|
||||
return c.Extensions().Deployments(d.Namespace).Delete(d.Name, deleteOptions)
|
||||
}
|
||||
|
||||
func testDeploymentHashCollisionAvoidance(f *framework.Framework) {
|
||||
ns := f.Namespace.Name
|
||||
c := f.ClientSet
|
||||
|
||||
deploymentName := "test-hash-collision"
|
||||
framework.Logf("Creating Deployment %q", deploymentName)
|
||||
podLabels := map[string]string{"name": NginxImageName}
|
||||
d := framework.NewDeployment(deploymentName, int32(0), podLabels, NginxImageName, NginxImage, extensions.RollingUpdateDeploymentStrategyType)
|
||||
deployment, err := c.Extensions().Deployments(ns).Create(d)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "1", NginxImage)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// TODO: Switch this to do a non-cascading deletion of the Deployment, mutate the ReplicaSet
|
||||
// once it has no owner reference, then recreate the Deployment if we ever proceed with
|
||||
// https://github.com/kubernetes/kubernetes/issues/44237
|
||||
framework.Logf("Mock a hash collision")
|
||||
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c.ExtensionsV1beta1())
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
var nilRs *extensions.ReplicaSet
|
||||
Expect(newRS).NotTo(Equal(nilRs))
|
||||
_, err = framework.UpdateReplicaSetWithRetries(c, ns, newRS.Name, func(update *extensions.ReplicaSet) {
|
||||
*update.Spec.Template.Spec.TerminationGracePeriodSeconds = int64(5)
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
framework.Logf("Expect deployment collision counter to increment")
|
||||
if err := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
|
||||
d, err := c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
framework.Logf("cannot get deployment %q: %v", deploymentName, err)
|
||||
return false, nil
|
||||
}
|
||||
framework.Logf(spew.Sprintf("deployment status: %#v", d.Status))
|
||||
return d.Status.CollisionCount != nil && *d.Status.CollisionCount == int32(1), nil
|
||||
}); err != nil {
|
||||
framework.Failf("Failed to increment collision counter for deployment %q: %v", deploymentName, err)
|
||||
}
|
||||
|
||||
framework.Logf("Expect a new ReplicaSet to be created")
|
||||
err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "2", NginxImage)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ import (
|
|||
)
|
||||
|
||||
func UpdateDeploymentWithRetries(c clientset.Interface, namespace, name string, applyUpdate testutils.UpdateDeploymentFunc) (*extensions.Deployment, error) {
|
||||
return testutils.UpdateDeploymentWithRetries(c, namespace, name, applyUpdate, Logf)
|
||||
return testutils.UpdateDeploymentWithRetries(c, namespace, name, applyUpdate, Logf, Poll, pollShortTimeout)
|
||||
}
|
||||
|
||||
// Waits for the deployment to clean up old rcs.
|
||||
|
|
|
@ -34,27 +34,8 @@ import (
|
|||
|
||||
type updateRsFunc func(d *extensions.ReplicaSet)
|
||||
|
||||
func UpdateReplicaSetWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateRsFunc) (*extensions.ReplicaSet, error) {
|
||||
var rs *extensions.ReplicaSet
|
||||
var updateErr error
|
||||
pollErr := wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
|
||||
var err error
|
||||
if rs, err = c.Extensions().ReplicaSets(namespace).Get(name, metav1.GetOptions{}); err != nil {
|
||||
return false, err
|
||||
}
|
||||
// Apply the update, then attempt to push it to the apiserver.
|
||||
applyUpdate(rs)
|
||||
if rs, err = c.Extensions().ReplicaSets(namespace).Update(rs); err == nil {
|
||||
Logf("Updating replica set %q", name)
|
||||
return true, nil
|
||||
}
|
||||
updateErr = err
|
||||
return false, nil
|
||||
})
|
||||
if pollErr == wait.ErrWaitTimeout {
|
||||
pollErr = fmt.Errorf("couldn't apply the provided updated to replicaset %q: %v", name, updateErr)
|
||||
}
|
||||
return rs, pollErr
|
||||
func UpdateReplicaSetWithRetries(c clientset.Interface, namespace, name string, applyUpdate testutils.UpdateReplicaSetFunc) (*extensions.ReplicaSet, error) {
|
||||
return testutils.UpdateReplicaSetWithRetries(c, namespace, name, applyUpdate, Logf, Poll, pollShortTimeout)
|
||||
}
|
||||
|
||||
// CheckNewRSAnnotations check if the new RS's annotation is as expected
|
||||
|
|
|
@ -21,6 +21,7 @@ go_test(
|
|||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"k8s.io/api/core/v1"
|
||||
"k8s.io/api/extensions/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
@ -171,7 +172,7 @@ func TestPausedDeployment(t *testing.T) {
|
|||
}
|
||||
|
||||
// Resume the deployment
|
||||
tester.deployment, err = tester.updateDeployment(resumeFn())
|
||||
tester.deployment, err = tester.updateDeployment(resumeFn)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to resume deployment %s: %v", tester.deployment.Name, err)
|
||||
}
|
||||
|
@ -198,7 +199,7 @@ func TestPausedDeployment(t *testing.T) {
|
|||
|
||||
// Pause the deployment.
|
||||
// The paused deployment shouldn't trigger a new rollout.
|
||||
tester.deployment, err = tester.updateDeployment(pauseFn())
|
||||
tester.deployment, err = tester.updateDeployment(pauseFn)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to pause deployment %s: %v", tester.deployment.Name, err)
|
||||
}
|
||||
|
@ -281,7 +282,7 @@ func TestScalePausedDeployment(t *testing.T) {
|
|||
}
|
||||
|
||||
// Pause the deployment.
|
||||
tester.deployment, err = tester.updateDeployment(pauseFn())
|
||||
tester.deployment, err = tester.updateDeployment(pauseFn)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to pause deployment %s: %v", tester.deployment.Name, err)
|
||||
}
|
||||
|
@ -319,3 +320,64 @@ func TestScalePausedDeployment(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Deployment rollout shouldn't be blocked on hash collisions
|
||||
func TestDeploymentHashCollision(t *testing.T) {
|
||||
s, closeFn, rm, dc, informers, c := dcSetup(t)
|
||||
defer closeFn()
|
||||
name := "test-hash-collision-deployment"
|
||||
ns := framework.CreateTestingNamespace(name, s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
|
||||
replicas := int32(1)
|
||||
tester := &deploymentTester{t: t, c: c, deployment: newDeployment(name, ns.Name, replicas)}
|
||||
|
||||
var err error
|
||||
tester.deployment, err = c.ExtensionsV1beta1().Deployments(ns.Name).Create(tester.deployment)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create deployment %s: %v", tester.deployment.Name, err)
|
||||
}
|
||||
|
||||
// Start informer and controllers
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
informers.Start(stopCh)
|
||||
go rm.Run(5, stopCh)
|
||||
go dc.Run(5, stopCh)
|
||||
|
||||
// Wait for the Deployment to be updated to revision 1
|
||||
if err := tester.waitForDeploymentRevisionAndImage("1", fakeImage); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Mock a hash collision
|
||||
newRS, err := deploymentutil.GetNewReplicaSet(tester.deployment, c.ExtensionsV1beta1())
|
||||
if err != nil {
|
||||
t.Fatalf("failed getting new replicaset of deployment %s: %v", tester.deployment.Name, err)
|
||||
}
|
||||
if newRS == nil {
|
||||
t.Fatalf("unable to find new replicaset of deployment %s", tester.deployment.Name)
|
||||
}
|
||||
_, err = tester.updateReplicaSet(newRS.Name, func(update *v1beta1.ReplicaSet) {
|
||||
*update.Spec.Template.Spec.TerminationGracePeriodSeconds = int64(5)
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed updating replicaset %s template: %v", newRS.Name, err)
|
||||
}
|
||||
|
||||
// Expect deployment collision counter to increment
|
||||
if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
|
||||
d, err := c.ExtensionsV1beta1().Deployments(ns.Name).Get(tester.deployment.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
return d.Status.CollisionCount != nil && *d.Status.CollisionCount == int32(1), nil
|
||||
}); err != nil {
|
||||
t.Fatalf("Failed to increment collision counter for deployment %q: %v", tester.deployment.Name, err)
|
||||
}
|
||||
|
||||
// Expect a new ReplicaSet to be created
|
||||
if err := tester.waitForDeploymentRevisionAndImage("2", fakeImage); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,13 +38,21 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
pollInterval = 1 * time.Second
|
||||
pollInterval = 100 * time.Millisecond
|
||||
pollTimeout = 60 * time.Second
|
||||
|
||||
fakeImageName = "fake-name"
|
||||
fakeImage = "fakeimage"
|
||||
)
|
||||
|
||||
var pauseFn = func(update *v1beta1.Deployment) {
|
||||
update.Spec.Paused = true
|
||||
}
|
||||
|
||||
var resumeFn = func(update *v1beta1.Deployment) {
|
||||
update.Spec.Paused = false
|
||||
}
|
||||
|
||||
type deploymentTester struct {
|
||||
t *testing.T
|
||||
c clientset.Interface
|
||||
|
@ -160,7 +168,7 @@ func (d *deploymentTester) markAllPodsReady() {
|
|||
d.t.Fatalf("failed to parse Deployment selector: %v", err)
|
||||
}
|
||||
var readyPods int32
|
||||
err = wait.Poll(100*time.Millisecond, pollTimeout, func() (bool, error) {
|
||||
err = wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
|
||||
readyPods = 0
|
||||
pods, err := d.c.Core().Pods(ns).List(metav1.ListOptions{LabelSelector: selector.String()})
|
||||
if err != nil {
|
||||
|
@ -209,7 +217,7 @@ func (d *deploymentTester) waitForDeploymentStatusValidAndMarkPodsReady() error
|
|||
}
|
||||
|
||||
func (d *deploymentTester) updateDeployment(applyUpdate testutil.UpdateDeploymentFunc) (*v1beta1.Deployment, error) {
|
||||
return testutil.UpdateDeploymentWithRetries(d.c, d.deployment.Namespace, d.deployment.Name, applyUpdate, d.t.Logf)
|
||||
return testutil.UpdateDeploymentWithRetries(d.c, d.deployment.Namespace, d.deployment.Name, applyUpdate, d.t.Logf, pollInterval, pollTimeout)
|
||||
}
|
||||
|
||||
func (d *deploymentTester) waitForObservedDeployment(desiredGeneration int64) error {
|
||||
|
@ -249,14 +257,6 @@ func (d *deploymentTester) expectNewReplicaSet() (*v1beta1.ReplicaSet, error) {
|
|||
return rs, nil
|
||||
}
|
||||
|
||||
func pauseFn() func(update *v1beta1.Deployment) {
|
||||
return func(update *v1beta1.Deployment) {
|
||||
update.Spec.Paused = true
|
||||
}
|
||||
}
|
||||
|
||||
func resumeFn() func(update *v1beta1.Deployment) {
|
||||
return func(update *v1beta1.Deployment) {
|
||||
update.Spec.Paused = false
|
||||
}
|
||||
func (d *deploymentTester) updateReplicaSet(name string, applyUpdate testutil.UpdateReplicaSetFunc) (*v1beta1.ReplicaSet, error) {
|
||||
return testutil.UpdateReplicaSetWithRetries(d.c, d.deployment.Namespace, name, applyUpdate, d.t.Logf, pollInterval, pollTimeout)
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ go_library(
|
|||
"density_utils.go",
|
||||
"deployment.go",
|
||||
"pod_store.go",
|
||||
"replicaset.go",
|
||||
"runners.go",
|
||||
"tmpdir.go",
|
||||
],
|
||||
|
|
|
@ -216,10 +216,10 @@ func containsImage(containers []v1.Container, imageName string) bool {
|
|||
|
||||
type UpdateDeploymentFunc func(d *extensions.Deployment)
|
||||
|
||||
func UpdateDeploymentWithRetries(c clientset.Interface, namespace, name string, applyUpdate UpdateDeploymentFunc, logf LogfFn) (*extensions.Deployment, error) {
|
||||
func UpdateDeploymentWithRetries(c clientset.Interface, namespace, name string, applyUpdate UpdateDeploymentFunc, logf LogfFn, pollInterval, pollTimeout time.Duration) (*extensions.Deployment, error) {
|
||||
var deployment *extensions.Deployment
|
||||
var updateErr error
|
||||
pollErr := wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
|
||||
pollErr := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
|
||||
var err error
|
||||
if deployment, err = c.Extensions().Deployments(namespace).Get(name, metav1.GetOptions{}); err != nil {
|
||||
return false, err
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
type UpdateReplicaSetFunc func(d *extensions.ReplicaSet)
|
||||
|
||||
func UpdateReplicaSetWithRetries(c clientset.Interface, namespace, name string, applyUpdate UpdateReplicaSetFunc, logf LogfFn, pollInterval, pollTimeout time.Duration) (*extensions.ReplicaSet, error) {
|
||||
var rs *extensions.ReplicaSet
|
||||
var updateErr error
|
||||
pollErr := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
|
||||
var err error
|
||||
if rs, err = c.Extensions().ReplicaSets(namespace).Get(name, metav1.GetOptions{}); err != nil {
|
||||
return false, err
|
||||
}
|
||||
// Apply the update, then attempt to push it to the apiserver.
|
||||
applyUpdate(rs)
|
||||
if rs, err = c.Extensions().ReplicaSets(namespace).Update(rs); err == nil {
|
||||
logf("Updating replica set %q", name)
|
||||
return true, nil
|
||||
}
|
||||
updateErr = err
|
||||
return false, nil
|
||||
})
|
||||
if pollErr == wait.ErrWaitTimeout {
|
||||
pollErr = fmt.Errorf("couldn't apply the provided updated to replicaset %q: %v", name, updateErr)
|
||||
}
|
||||
return rs, pollErr
|
||||
}
|
Loading…
Reference in New Issue