Add an e2e test for running a gpu job interrupted by node recreation.

k3s-v1.15.3
Richard Chen 2019-04-10 14:31:19 -07:00
parent f8d2b6b982
commit 2a70a0b424
7 changed files with 139 additions and 69 deletions

View File

@ -22,7 +22,7 @@ import (
"sync"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -248,6 +248,23 @@ func (c *PodClient) WaitForFailure(name string, timeout time.Duration) {
)).To(gomega.Succeed(), "wait for pod %q to fail", name)
}
// WaitForFinish waits for pod to finish running, regardless of success or failure.
func (c *PodClient) WaitForFinish(name string, timeout time.Duration) {
f := c.f
gomega.Expect(WaitForPodCondition(f.ClientSet, f.Namespace.Name, name, "success or failure", timeout,
func(pod *v1.Pod) (bool, error) {
switch pod.Status.Phase {
case v1.PodFailed:
return true, nil
case v1.PodSucceeded:
return true, nil
default:
return false, nil
}
},
)).To(gomega.Succeed(), "wait for pod %q to finish running", name)
}
// WaitForErrorEventOrSuccess waits for pod to succeed or an error event for that pod.
func (c *PodClient) WaitForErrorEventOrSuccess(pod *v1.Pod) (*v1.Event, error) {
var ev *v1.Event

View File

@ -92,12 +92,12 @@ var _ = ginkgo.Describe("Recreate [Feature:Recreate]", func() {
// Recreate all the nodes in the test instance group
func testRecreate(c clientset.Interface, ps *testutils.PodStore, systemNamespace string, nodes []v1.Node, podNames []string) {
err := recreateNodes(c, nodes)
err := RecreateNodes(c, nodes)
if err != nil {
framework.Failf("Test failed; failed to start the restart instance group command.")
}
err = waitForNodeBootIdsToChange(c, nodes, framework.RecreateNodeReadyAgainTimeout)
err = WaitForNodeBootIdsToChange(c, nodes, framework.RecreateNodeReadyAgainTimeout)
if err != nil {
framework.Failf("Test failed; failed to recreate at least one node in %v.", framework.RecreateNodeReadyAgainTimeout)
}

View File

@ -28,7 +28,8 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
)
func recreateNodes(c clientset.Interface, nodes []v1.Node) error {
// RecreateNodes recreates the given nodes in a managed instance group.
func RecreateNodes(c clientset.Interface, nodes []v1.Node) error {
// Build mapping from zone to nodes in that zone.
nodeNamesByZone := make(map[string][]string)
for i := range nodes {
@ -63,13 +64,14 @@ func recreateNodes(c clientset.Interface, nodes []v1.Node) error {
framework.Logf("Recreating instance group %s.", instanceGroup)
stdout, stderr, err := framework.RunCmd("gcloud", args...)
if err != nil {
return fmt.Errorf("error restarting nodes: %s\nstdout: %s\nstderr: %s", err, stdout, stderr)
return fmt.Errorf("error recreating nodes: %s\nstdout: %s\nstderr: %s", err, stdout, stderr)
}
}
return nil
}
func waitForNodeBootIdsToChange(c clientset.Interface, nodes []v1.Node, timeout time.Duration) error {
// WaitForNodeBootIdsToChange waits for the boot ids of the given nodes to change in order to verify the node has been recreated.
func WaitForNodeBootIdsToChange(c clientset.Interface, nodes []v1.Node, timeout time.Duration) error {
errMsg := []string{}
for i := range nodes {
node := &nodes[i]

View File

@ -45,6 +45,7 @@ go_library(
"//test/e2e/common:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/gpu:go_default_library",
"//test/e2e/framework/job:go_default_library",
"//test/e2e/framework/log:go_default_library",
"//test/e2e/framework/providers/gce:go_default_library",
"//test/e2e/framework/replicaset:go_default_library",

View File

@ -18,6 +18,7 @@ package scheduling
import (
"os"
"regexp"
"time"
v1 "k8s.io/api/core/v1"
@ -27,7 +28,9 @@ import (
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/framework/gpu"
jobutil "k8s.io/kubernetes/test/e2e/framework/job"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
"k8s.io/kubernetes/test/e2e/framework/providers/gce"
imageutils "k8s.io/kubernetes/test/utils/image"
"github.com/onsi/ginkgo"
@ -131,7 +134,7 @@ func SetupNVIDIAGPUNode(f *framework.Framework, setupResourceGatherer bool) *fra
e2elog.Logf("Using %v", dsYamlURL)
// Creates the DaemonSet that installs Nvidia Drivers.
ds, err := framework.DsFromManifest(dsYamlURL)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
framework.ExpectNoError(err)
ds.Namespace = f.Namespace.Name
_, err = f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).Create(ds)
framework.ExpectNoError(err, "failed to create nvidia-driver-installer daemonset")
@ -172,8 +175,8 @@ func testNvidiaGPUs(f *framework.Framework) {
}
e2elog.Logf("Wait for all test pods to succeed")
// Wait for all pods to succeed
for _, po := range podList {
f.PodClient().WaitForSuccess(po.Name, 5*time.Minute)
for _, pod := range podList {
f.PodClient().WaitForSuccess(pod.Name, 5*time.Minute)
}
e2elog.Logf("Stopping ResourceUsageGather")
@ -190,3 +193,104 @@ var _ = SIGDescribe("[Feature:GPUDevicePlugin]", func() {
testNvidiaGPUs(f)
})
})
func testNvidiaGPUsJob(f *framework.Framework) {
_ = SetupNVIDIAGPUNode(f, false)
// Job set to have 5 completions with parallelism of 1 to ensure that it lasts long enough to experience the node recreation
completions := int32(5)
ginkgo.By("Starting GPU job")
StartJob(f, completions)
job, err := jobutil.GetJob(f.ClientSet, f.Namespace.Name, "cuda-add")
framework.ExpectNoError(err)
// make sure job is running by waiting for its first pod to start running
err = jobutil.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, job.Name, 1)
framework.ExpectNoError(err)
numNodes, err := framework.NumberOfRegisteredNodes(f.ClientSet)
framework.ExpectNoError(err)
nodes, err := framework.CheckNodesReady(f.ClientSet, numNodes, framework.NodeReadyInitialTimeout)
framework.ExpectNoError(err)
ginkgo.By("Recreating nodes")
err = gce.RecreateNodes(f.ClientSet, nodes)
framework.ExpectNoError(err)
ginkgo.By("Done recreating nodes")
ginkgo.By("Waiting for gpu job to finish")
err = jobutil.WaitForJobFinish(f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err)
ginkgo.By("Done with gpu job")
gomega.Expect(job.Status.Failed).To(gomega.BeZero(), "Job pods failed during node recreation: %v", job.Status.Failed)
VerifyJobNCompletions(f, completions)
}
// StartJob starts a simple CUDA job that requests gpu and the specified number of completions
func StartJob(f *framework.Framework, completions int32) {
var activeSeconds int64 = 3600
testJob := jobutil.NewTestJob("succeed", "cuda-add", v1.RestartPolicyAlways, 1, completions, &activeSeconds, 6)
testJob.Spec.Template.Spec = v1.PodSpec{
RestartPolicy: v1.RestartPolicyOnFailure,
Containers: []v1.Container{
{
Name: "vector-addition",
Image: imageutils.GetE2EImage(imageutils.CudaVectorAdd),
Command: []string{"/bin/sh", "-c", "./vectorAdd && sleep 60"},
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
gpuResourceName: *resource.NewQuantity(1, resource.DecimalSI),
},
},
},
},
}
ns := f.Namespace.Name
_, err := jobutil.CreateJob(f.ClientSet, ns, testJob)
framework.ExpectNoError(err)
framework.Logf("Created job %v", testJob)
}
// VerifyJobNCompletions verifies that the job has completions number of successful pods
func VerifyJobNCompletions(f *framework.Framework, completions int32) {
ns := f.Namespace.Name
pods, err := jobutil.GetJobPods(f.ClientSet, f.Namespace.Name, "cuda-add")
framework.ExpectNoError(err)
createdPods := pods.Items
createdPodNames := podNames(createdPods)
framework.Logf("Got the following pods for job cuda-add: %v", createdPodNames)
successes := int32(0)
for _, podName := range createdPodNames {
f.PodClient().WaitForFinish(podName, 5*time.Minute)
logs, err := framework.GetPodLogs(f.ClientSet, ns, podName, "vector-addition")
framework.ExpectNoError(err, "Should be able to get logs for pod %v", podName)
regex := regexp.MustCompile("PASSED")
if regex.MatchString(logs) {
successes++
}
}
if successes != completions {
framework.Failf("Only got %v completions. Expected %v completions.", successes, completions)
}
}
func podNames(pods []v1.Pod) []string {
originalPodNames := make([]string, len(pods))
for i, p := range pods {
originalPodNames[i] = p.ObjectMeta.Name
}
return originalPodNames
}
var _ = SIGDescribe("GPUDevicePluginAcrossRecreate [Feature:Recreate]", func() {
ginkgo.BeforeEach(func() {
framework.SkipUnlessProviderIs("gce", "gke")
})
f := framework.NewDefaultFramework("device-plugin-gpus-recreate")
ginkgo.It("run Nvidia GPU Device Plugin tests with a recreation", func() {
testNvidiaGPUsJob(f)
})
})

View File

@ -28,7 +28,6 @@ go_library(
"//staging/src/k8s.io/api/autoscaling/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
@ -37,7 +36,6 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//test/e2e/common:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/gpu:go_default_library",
"//test/e2e/framework/job:go_default_library",
"//test/e2e/framework/log:go_default_library",
"//test/e2e/framework/testfiles:go_default_library",

View File

@ -17,22 +17,18 @@ limitations under the License.
package upgrades
import (
"regexp"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/framework/gpu"
jobutil "k8s.io/kubernetes/test/e2e/framework/job"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
"k8s.io/kubernetes/test/e2e/scheduling"
imageutils "k8s.io/kubernetes/test/utils/image"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
)
const (
completions = int32(1)
)
// NvidiaGPUUpgradeTest tests that gpu resource is available before and after
// a cluster upgrade.
type NvidiaGPUUpgradeTest struct {
@ -45,7 +41,7 @@ func (NvidiaGPUUpgradeTest) Name() string { return "nvidia-gpu-upgrade [sig-node
func (t *NvidiaGPUUpgradeTest) Setup(f *framework.Framework) {
scheduling.SetupNVIDIAGPUNode(f, false)
ginkgo.By("Creating a job requesting gpu")
t.startJob(f)
scheduling.StartJob(f, completions)
}
// Test waits for the upgrade to complete, and then verifies that the
@ -53,7 +49,7 @@ func (t *NvidiaGPUUpgradeTest) Setup(f *framework.Framework) {
func (t *NvidiaGPUUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade UpgradeType) {
<-done
ginkgo.By("Verifying gpu job success")
t.verifyJobPodSuccess(f)
scheduling.VerifyJobNCompletions(f, completions)
if upgrade == MasterUpgrade || upgrade == ClusterUpgrade {
// MasterUpgrade should be totally hitless.
job, err := jobutil.GetJob(f.ClientSet, f.Namespace.Name, "cuda-add")
@ -66,51 +62,3 @@ func (t *NvidiaGPUUpgradeTest) Test(f *framework.Framework, done <-chan struct{}
func (t *NvidiaGPUUpgradeTest) Teardown(f *framework.Framework) {
// rely on the namespace deletion to clean up everything
}
// startJob creates a job that requests gpu and runs a simple cuda container.
func (t *NvidiaGPUUpgradeTest) startJob(f *framework.Framework) {
var activeSeconds int64 = 3600
// Specifies 100 completions to make sure the job life spans across the upgrade.
testJob := jobutil.NewTestJob("succeed", "cuda-add", v1.RestartPolicyAlways, 1, 100, &activeSeconds, 6)
testJob.Spec.Template.Spec = v1.PodSpec{
RestartPolicy: v1.RestartPolicyOnFailure,
Containers: []v1.Container{
{
Name: "vector-addition",
Image: imageutils.GetE2EImage(imageutils.CudaVectorAdd),
Command: []string{"/bin/sh", "-c", "./vectorAdd && sleep 60"},
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
gpu.NVIDIAGPUResourceName: *resource.NewQuantity(1, resource.DecimalSI),
},
},
},
},
}
ns := f.Namespace.Name
_, err := jobutil.CreateJob(f.ClientSet, ns, testJob)
framework.ExpectNoError(err)
e2elog.Logf("Created job %v", testJob)
ginkgo.By("Waiting for gpu job pod start")
err = jobutil.WaitForAllJobPodsRunning(f.ClientSet, ns, testJob.Name, 1)
framework.ExpectNoError(err)
ginkgo.By("Done with gpu job pod start")
}
// verifyJobPodSuccess verifies that the started cuda pod successfully passes.
func (t *NvidiaGPUUpgradeTest) verifyJobPodSuccess(f *framework.Framework) {
// Wait for client pod to complete.
ns := f.Namespace.Name
err := jobutil.WaitForAllJobPodsRunning(f.ClientSet, f.Namespace.Name, "cuda-add", 1)
framework.ExpectNoError(err)
pods, err := jobutil.GetJobPods(f.ClientSet, f.Namespace.Name, "cuda-add")
framework.ExpectNoError(err)
createdPod := pods.Items[0].Name
e2elog.Logf("Created pod %v", createdPod)
f.PodClient().WaitForSuccess(createdPod, 5*time.Minute)
logs, err := framework.GetPodLogs(f.ClientSet, ns, createdPod, "vector-addition")
framework.ExpectNoError(err, "Should be able to get pod logs")
e2elog.Logf("Got pod logs: %v", logs)
regex := regexp.MustCompile("PASSED")
gomega.Expect(regex.MatchString(logs)).To(gomega.BeTrue())
}