Merge pull request #62271 from wojtek-t/fix_disruptive_regional_tests_3

Automatic merge from submit-queue (batch tested with PRs 62245, 62271). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fix disruptive regional tests 3

With GKE multizonal and regional clusters (when nodes are in created in more than one zones), in test framework we are passing:
1. --num-nodes equal to number of nodes in a single zone
2. --node-instance-groups equal to MIG names in that zone
The (1) is consistent with end-user experience, since the same semantic for "--num-nodes" is used in gcloud. (2) is strictly connected to that, and we don't want to change that.

As a result, the number of expected nodes, mig size etc. can't be explicitly based on those numbers. Also commands for nodes restarts etc. need to be splitted across zones and correct zone parameter has to be passed.
pull/8/head
Kubernetes Submit Queue 2018-04-09 05:36:09 -07:00 committed by GitHub
commit 8eeb0c5524
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 86 additions and 159 deletions

View File

@ -377,9 +377,9 @@ var _ = SIGDescribe("Network Partition [Disruptive] [Slow]", func() {
nn, err := framework.NumberOfRegisteredNodes(f.ClientSet)
framework.ExpectNoError(err)
nodeNames, err := framework.CheckNodesReady(f.ClientSet, framework.NodeReadyInitialTimeout, nn)
nodes, err := framework.CheckNodesReady(f.ClientSet, nn, framework.NodeReadyInitialTimeout)
framework.ExpectNoError(err)
common.RestartNodes(f.ClientSet, nodeNames)
common.RestartNodes(f.ClientSet, nodes)
By("waiting for pods to be running again")
pst.WaitForRunningAndReady(*ps.Spec.Replicas, ps)

View File

@ -99,9 +99,11 @@ var _ = SIGDescribe("DNS horizontal autoscaling", func() {
// This test is separated because it is slow and need to run serially.
// Will take around 5 minutes to run on a 4 nodes cluster.
It("[Serial] [Slow] kube-dns-autoscaler should scale kube-dns pods when cluster size changed", func() {
numNodes, err := framework.NumberOfRegisteredNodes(c)
Expect(err).NotTo(HaveOccurred())
By("Replace the dns autoscaling parameters with testing parameters")
err := updateDNSScalingConfigMap(c, packDNSScalingConfigMap(packLinearParams(&DNSParams_1)))
err = updateDNSScalingConfigMap(c, packDNSScalingConfigMap(packLinearParams(&DNSParams_1)))
Expect(err).NotTo(HaveOccurred())
defer func() {
By("Restoring initial dns autoscaling parameters")
@ -117,25 +119,21 @@ var _ = SIGDescribe("DNS horizontal autoscaling", func() {
Expect(waitForDNSReplicasSatisfied(c, getExpectReplicasLinear, DNSdefaultTimeout)).NotTo(HaveOccurred())
originalSizes := make(map[string]int)
sum := 0
for _, mig := range strings.Split(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") {
size, err := framework.GroupSize(mig)
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("Initial size of %s: %d", mig, size))
originalSizes[mig] = size
sum += size
}
By("Manually increase cluster size")
increasedSize := 0
increasedSizes := make(map[string]int)
for key, val := range originalSizes {
increasedSizes[key] = val + 1
increasedSize += increasedSizes[key]
}
setMigSizes(increasedSizes)
Expect(WaitForClusterSizeFunc(c,
func(size int) bool { return size == increasedSize }, scaleUpTimeout)).NotTo(HaveOccurred())
func(size int) bool { return size == numNodes+len(originalSizes) }, scaleUpTimeout)).NotTo(HaveOccurred())
By("Wait for kube-dns scaled to expected number")
getExpectReplicasLinear = getExpectReplicasFuncLinear(c, &DNSParams_1)
@ -151,7 +149,7 @@ var _ = SIGDescribe("DNS horizontal autoscaling", func() {
By("Restoring cluster size")
setMigSizes(originalSizes)
Expect(framework.WaitForReadyNodes(c, sum, scaleDownTimeout)).NotTo(HaveOccurred())
Expect(framework.WaitForReadyNodes(c, numNodes, scaleDownTimeout)).NotTo(HaveOccurred())
By("Wait for kube-dns scaled to expected number")
Expect(waitForDNSReplicasSatisfied(c, getExpectReplicasLinear, DNSdefaultTimeout)).NotTo(HaveOccurred())

View File

@ -41,6 +41,7 @@ go_library(
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/conditions:go_default_library",
"//pkg/kubelet:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/kubelet/sysctl:go_default_library",
"//pkg/security/apparmor:go_default_library",
"//pkg/util/version:go_default_library",

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/test/e2e/framework"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -98,39 +99,45 @@ func NewRCByName(c clientset.Interface, ns, name string, replicas int32, gracePe
name, replicas, framework.ServeHostnameImage, 9376, v1.ProtocolTCP, map[string]string{}, gracePeriod))
}
func RestartNodes(c clientset.Interface, nodeNames []string) error {
// List old boot IDs.
oldBootIDs := make(map[string]string)
for _, name := range nodeNames {
node, err := c.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error getting node info before reboot: %s", err)
func RestartNodes(c clientset.Interface, nodes []v1.Node) error {
// Build mapping from zone to nodes in that zone.
nodeNamesByZone := make(map[string][]string)
for i := range nodes {
node := &nodes[i]
zone := framework.TestContext.CloudConfig.Zone
if z, ok := node.Labels[kubeletapis.LabelZoneFailureDomain]; ok {
zone = z
}
oldBootIDs[name] = node.Status.NodeInfo.BootID
nodeNamesByZone[zone] = append(nodeNamesByZone[zone], node.Name)
}
// Reboot the nodes.
args := []string{
"compute",
fmt.Sprintf("--project=%s", framework.TestContext.CloudConfig.ProjectID),
"instances",
"reset",
}
args = append(args, nodeNames...)
args = append(args, fmt.Sprintf("--zone=%s", framework.TestContext.CloudConfig.Zone))
stdout, stderr, err := framework.RunCmd("gcloud", args...)
if err != nil {
return fmt.Errorf("error restarting nodes: %s\nstdout: %s\nstderr: %s", err, stdout, stderr)
for zone, nodeNames := range nodeNamesByZone {
args := []string{
"compute",
fmt.Sprintf("--project=%s", framework.TestContext.CloudConfig.ProjectID),
"instances",
"reset",
}
args = append(args, nodeNames...)
args = append(args, fmt.Sprintf("--zone=%s", zone))
stdout, stderr, err := framework.RunCmd("gcloud", args...)
if err != nil {
return fmt.Errorf("error restarting nodes: %s\nstdout: %s\nstderr: %s", err, stdout, stderr)
}
}
// Wait for their boot IDs to change.
for _, name := range nodeNames {
for i := range nodes {
node := &nodes[i]
if err := wait.Poll(30*time.Second, 5*time.Minute, func() (bool, error) {
node, err := c.CoreV1().Nodes().Get(name, metav1.GetOptions{})
newNode, err := c.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("error getting node info after reboot: %s", err)
}
return node.Status.NodeInfo.BootID != oldBootIDs[name], nil
return node.Status.NodeInfo.BootID != newNode.Status.NodeInfo.BootID, nil
}); err != nil {
return fmt.Errorf("error waiting for node %s boot ID to change: %s", name, err)
return fmt.Errorf("error waiting for node %s boot ID to change: %s", node.Name, err)
}
}
return nil

View File

@ -24,11 +24,7 @@ import (
"strings"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
)
func EtcdUpgrade(target_storage, target_version string) error {
@ -215,7 +211,7 @@ func NodeUpgrade(f *Framework, v string, img string) error {
// TODO(ihmccreery) We shouldn't have to wait for nodes to be ready in
// GKE; the operation shouldn't return until they all are.
Logf("Waiting up to %v for all nodes to be ready after the upgrade", RestartNodeReadyAgainTimeout)
if _, err := CheckNodesReady(f.ClientSet, RestartNodeReadyAgainTimeout, TestContext.CloudConfig.NumNodes); err != nil {
if _, err := CheckNodesReady(f.ClientSet, TestContext.CloudConfig.NumNodes, RestartNodeReadyAgainTimeout); err != nil {
return err
}
return nil
@ -229,7 +225,7 @@ func NodeUpgradeGCEWithKubeProxyDaemonSet(f *Framework, v string, img string, en
}
// Wait for it to complete and validate nodes are healthy.
Logf("Waiting up to %v for all nodes to be ready after the upgrade", RestartNodeReadyAgainTimeout)
if _, err := CheckNodesReady(f.ClientSet, RestartNodeReadyAgainTimeout, TestContext.CloudConfig.NumNodes); err != nil {
if _, err := CheckNodesReady(f.ClientSet, TestContext.CloudConfig.NumNodes, RestartNodeReadyAgainTimeout); err != nil {
return err
}
return nil
@ -274,63 +270,6 @@ func nodeUpgradeGKE(v string, img string) error {
return nil
}
// CheckNodesReady waits up to nt for expect nodes accessed by c to be ready,
// returning an error if this doesn't happen in time. It returns the names of
// nodes it finds.
func CheckNodesReady(c clientset.Interface, nt time.Duration, expect int) ([]string, error) {
// First, keep getting all of the nodes until we get the number we expect.
var nodeList *v1.NodeList
var errLast error
start := time.Now()
found := wait.Poll(Poll, nt, func() (bool, error) {
// A rolling-update (GCE/GKE implementation of restart) can complete before the apiserver
// knows about all of the nodes. Thus, we retry the list nodes call
// until we get the expected number of nodes.
nodeList, errLast = c.CoreV1().Nodes().List(metav1.ListOptions{
FieldSelector: fields.Set{"spec.unschedulable": "false"}.AsSelector().String()})
if errLast != nil {
return false, nil
}
if len(nodeList.Items) != expect {
errLast = fmt.Errorf("expected to find %d nodes but found only %d (%v elapsed)",
expect, len(nodeList.Items), time.Since(start))
Logf("%v", errLast)
return false, nil
}
return true, nil
}) == nil
nodeNames := make([]string, len(nodeList.Items))
for i, n := range nodeList.Items {
nodeNames[i] = n.ObjectMeta.Name
}
if !found {
return nodeNames, fmt.Errorf("couldn't find %d nodes within %v; last error: %v",
expect, nt, errLast)
}
Logf("Successfully found %d nodes", expect)
// Next, ensure in parallel that all the nodes are ready. We subtract the
// time we spent waiting above.
timeout := nt - time.Since(start)
result := make(chan bool, len(nodeList.Items))
for _, n := range nodeNames {
n := n
go func() { result <- WaitForNodeToBeReady(c, n, timeout) }()
}
failed := false
for i := range nodeList.Items {
_ = i
if !<-result {
failed = true
}
}
if failed {
return nodeNames, fmt.Errorf("at least one node failed to be ready")
}
return nodeNames, nil
}
// MigTemplate (GCE-only) returns the name of the MIG template that the
// nodes of the cluster use.
func MigTemplate() (string, error) {

View File

@ -4123,9 +4123,10 @@ func NumberOfReadyNodes(c clientset.Interface) (int, error) {
return len(nodes.Items), nil
}
// WaitForReadyNodes waits until the cluster has desired size and there is no not-ready nodes in it.
// By cluster size we mean number of Nodes excluding Master Node.
func WaitForReadyNodes(c clientset.Interface, size int, timeout time.Duration) error {
// CheckNodesReady waits up to timeout for cluster to has desired size and
// there is no not-ready nodes in it. By cluster size we mean number of Nodes
// excluding Master Node.
func CheckNodesReady(c clientset.Interface, size int, timeout time.Duration) ([]v1.Node, error) {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(20 * time.Second) {
nodes, err := waitListSchedulableNodes(c)
if err != nil {
@ -4142,11 +4143,19 @@ func WaitForReadyNodes(c clientset.Interface, size int, timeout time.Duration) e
if numNodes == size && numReady == size {
Logf("Cluster has reached the desired number of ready nodes %d", size)
return nil
return nodes.Items, nil
}
Logf("Waiting for ready nodes %d, current ready %d, not ready nodes %d", size, numReady, numNodes-numReady)
}
return fmt.Errorf("timeout waiting %v for number of ready nodes to be %d", timeout, size)
return nil, fmt.Errorf("timeout waiting %v for number of ready nodes to be %d", timeout, size)
}
// WaitForReadyNodes waits up to timeout for cluster to has desired size and
// there is no not-ready nodes in it. By cluster size we mean number of Nodes
// excluding Master Node.
func WaitForReadyNodes(c clientset.Interface, size int, timeout time.Duration) error {
_, err := CheckNodesReady(c, size, timeout)
return err
}
func GenerateMasterRegexp(prefix string) string {

View File

@ -63,6 +63,7 @@ var _ = SIGDescribe("Nodes [Disruptive]", func() {
// Slow issue #13323 (8 min)
Describe("Resize [Slow]", func() {
var originalNodeCount int32
var skipped bool
BeforeEach(func() {
@ -96,7 +97,8 @@ var _ = SIGDescribe("Nodes [Disruptive]", func() {
if err := framework.WaitForGroupSize(group, int32(framework.TestContext.CloudConfig.NumNodes)); err != nil {
framework.Failf("Couldn't restore the original node instance group size: %v", err)
}
if err := framework.WaitForReadyNodes(c, framework.TestContext.CloudConfig.NumNodes, 10*time.Minute); err != nil {
if err := framework.WaitForReadyNodes(c, int(originalNodeCount), 10*time.Minute); err != nil {
framework.Failf("Couldn't restore the original cluster size: %v", err)
}
// Many e2e tests assume that the cluster is fully healthy before they start. Wait until
@ -114,9 +116,9 @@ var _ = SIGDescribe("Nodes [Disruptive]", func() {
name := "my-hostname-delete-node"
numNodes, err := framework.NumberOfRegisteredNodes(c)
Expect(err).NotTo(HaveOccurred())
replicas := int32(numNodes)
common.NewRCByName(c, ns, name, replicas, nil)
err = framework.VerifyPods(c, ns, name, true, replicas)
originalNodeCount = int32(numNodes)
common.NewRCByName(c, ns, name, originalNodeCount, nil)
err = framework.VerifyPods(c, ns, name, true, originalNodeCount)
Expect(err).NotTo(HaveOccurred())
targetNumNodes := int32(framework.TestContext.CloudConfig.NumNodes - 1)
@ -125,7 +127,7 @@ var _ = SIGDescribe("Nodes [Disruptive]", func() {
Expect(err).NotTo(HaveOccurred())
err = framework.WaitForGroupSize(group, targetNumNodes)
Expect(err).NotTo(HaveOccurred())
err = framework.WaitForReadyNodes(c, int(replicas-1), 10*time.Minute)
err = framework.WaitForReadyNodes(c, int(originalNodeCount-1), 10*time.Minute)
Expect(err).NotTo(HaveOccurred())
By("waiting 1 minute for the watch in the podGC to catch up, remove any pods scheduled on " +
@ -133,7 +135,7 @@ var _ = SIGDescribe("Nodes [Disruptive]", func() {
time.Sleep(time.Minute)
By("verifying whether the pods from the removed node are recreated")
err = framework.VerifyPods(c, ns, name, true, replicas)
err = framework.VerifyPods(c, ns, name, true, originalNodeCount)
Expect(err).NotTo(HaveOccurred())
})
@ -145,9 +147,9 @@ var _ = SIGDescribe("Nodes [Disruptive]", func() {
common.NewSVCByName(c, ns, name)
numNodes, err := framework.NumberOfRegisteredNodes(c)
Expect(err).NotTo(HaveOccurred())
replicas := int32(numNodes)
common.NewRCByName(c, ns, name, replicas, nil)
err = framework.VerifyPods(c, ns, name, true, replicas)
originalNodeCount = int32(numNodes)
common.NewRCByName(c, ns, name, originalNodeCount, nil)
err = framework.VerifyPods(c, ns, name, true, originalNodeCount)
Expect(err).NotTo(HaveOccurred())
targetNumNodes := int32(framework.TestContext.CloudConfig.NumNodes + 1)
@ -156,13 +158,13 @@ var _ = SIGDescribe("Nodes [Disruptive]", func() {
Expect(err).NotTo(HaveOccurred())
err = framework.WaitForGroupSize(group, targetNumNodes)
Expect(err).NotTo(HaveOccurred())
err = framework.WaitForReadyNodes(c, int(replicas+1), 10*time.Minute)
err = framework.WaitForReadyNodes(c, int(originalNodeCount+1), 10*time.Minute)
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("increasing size of the replication controller to %d and verifying all pods are running", replicas+1))
err = resizeRC(c, ns, name, replicas+1)
By(fmt.Sprintf("increasing size of the replication controller to %d and verifying all pods are running", originalNodeCount+1))
err = resizeRC(c, ns, name, originalNodeCount+1)
Expect(err).NotTo(HaveOccurred())
err = framework.VerifyPods(c, ns, name, true, replicas+1)
err = framework.VerifyPods(c, ns, name, true, originalNodeCount+1)
Expect(err).NotTo(HaveOccurred())
})
})

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/test/e2e/common"
"k8s.io/kubernetes/test/e2e/framework"
testutils "k8s.io/kubernetes/test/utils"
@ -55,10 +56,18 @@ func filterIrrelevantPods(pods []*v1.Pod) []*v1.Pod {
return results
}
func nodeNames(nodes []v1.Node) []string {
result := make([]string, 0, len(nodes))
for i := range nodes {
result = append(result, nodes[i].Name)
}
return result
}
var _ = SIGDescribe("Restart [Disruptive]", func() {
f := framework.NewDefaultFramework("restart")
var ps *testutils.PodStore
var originalNodeNames []string
var originalNodes []v1.Node
var originalPodNames []string
var numNodes int
var systemNamespace string
@ -74,9 +83,9 @@ var _ = SIGDescribe("Restart [Disruptive]", func() {
systemNamespace = metav1.NamespaceSystem
By("ensuring all nodes are ready")
originalNodeNames, err = framework.CheckNodesReady(f.ClientSet, framework.NodeReadyInitialTimeout, numNodes)
originalNodes, err = framework.CheckNodesReady(f.ClientSet, numNodes, framework.NodeReadyInitialTimeout)
Expect(err).NotTo(HaveOccurred())
framework.Logf("Got the following nodes before restart: %v", originalNodeNames)
framework.Logf("Got the following nodes before restart: %v", nodeNames(originalNodes))
By("ensuring all pods are running and ready")
allPods := ps.List()
@ -100,20 +109,20 @@ var _ = SIGDescribe("Restart [Disruptive]", func() {
It("should restart all nodes and ensure all nodes and pods recover", func() {
By("restarting all of the nodes")
err := restartNodes(f, originalNodeNames)
err := common.RestartNodes(f.ClientSet, originalNodes)
Expect(err).NotTo(HaveOccurred())
By("ensuring all nodes are ready after the restart")
nodeNamesAfter, err := framework.CheckNodesReady(f.ClientSet, framework.RestartNodeReadyAgainTimeout, numNodes)
nodesAfter, err := framework.CheckNodesReady(f.ClientSet, numNodes, framework.RestartNodeReadyAgainTimeout)
Expect(err).NotTo(HaveOccurred())
framework.Logf("Got the following nodes after restart: %v", nodeNamesAfter)
framework.Logf("Got the following nodes after restart: %v", nodeNames(nodesAfter))
// Make sure that we have the same number of nodes. We're not checking
// that the names match because that's implementation specific.
By("ensuring the same number of nodes exist after the restart")
if len(originalNodeNames) != len(nodeNamesAfter) {
if len(originalNodes) != len(nodesAfter) {
framework.Failf("Had %d nodes before nodes were restarted, but now only have %d",
len(originalNodeNames), len(nodeNamesAfter))
len(originalNodes), len(nodesAfter))
}
// Make sure that we have the same number of pods. We're not checking
@ -159,41 +168,3 @@ func waitForNPods(ps *testutils.PodStore, expect int, timeout time.Duration) ([]
}
return podNames, nil
}
func restartNodes(f *framework.Framework, nodeNames []string) error {
// List old boot IDs.
oldBootIDs := make(map[string]string)
for _, name := range nodeNames {
node, err := f.ClientSet.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error getting node info before reboot: %s", err)
}
oldBootIDs[name] = node.Status.NodeInfo.BootID
}
// Reboot the nodes.
args := []string{
"compute",
fmt.Sprintf("--project=%s", framework.TestContext.CloudConfig.ProjectID),
"instances",
"reset",
}
args = append(args, nodeNames...)
args = append(args, fmt.Sprintf("--zone=%s", framework.TestContext.CloudConfig.Zone))
stdout, stderr, err := framework.RunCmd("gcloud", args...)
if err != nil {
return fmt.Errorf("error restarting nodes: %s\nstdout: %s\nstderr: %s", err, stdout, stderr)
}
// Wait for their boot IDs to change.
for _, name := range nodeNames {
if err := wait.Poll(30*time.Second, 5*time.Minute, func() (bool, error) {
node, err := f.ClientSet.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("error getting node info after reboot: %s", err)
}
return node.Status.NodeInfo.BootID != oldBootIDs[name], nil
}); err != nil {
return fmt.Errorf("error waiting for node %s boot ID to change: %s", name, err)
}
}
return nil
}