diff --git a/test/e2e_node/BUILD b/test/e2e_node/BUILD index 49b09cc00b..93ded55e11 100644 --- a/test/e2e_node/BUILD +++ b/test/e2e_node/BUILD @@ -93,6 +93,7 @@ go_test( "log_path_test.go", "mirror_pod_test.go", "node_container_manager_test.go", + "node_perf_test.go", "pods_container_manager_test.go", "resource_usage_test.go", "restart_test.go", @@ -143,6 +144,7 @@ go_test( "//test/e2e/common:go_default_library", "//test/e2e/framework:go_default_library", "//test/e2e/framework/metrics:go_default_library", + "//test/e2e_node/perf/workloads:go_default_library", "//test/e2e_node/services:go_default_library", "//test/utils/image:go_default_library", "//vendor/github.com/blang/semver:go_default_library", @@ -193,6 +195,7 @@ filegroup( ":package-srcs", "//test/e2e_node/builder:all-srcs", "//test/e2e_node/environment:all-srcs", + "//test/e2e_node/perf/workloads:all-srcs", "//test/e2e_node/perftype:all-srcs", "//test/e2e_node/remote:all-srcs", "//test/e2e_node/runner/local:all-srcs", diff --git a/test/e2e_node/image_list.go b/test/e2e_node/image_list.go index efc498c45d..e7bc3f9bae 100644 --- a/test/e2e_node/image_list.go +++ b/test/e2e_node/image_list.go @@ -53,6 +53,9 @@ var NodeImageWhiteList = sets.NewString( imageutils.GetE2EImage(imageutils.Nonewprivs), imageutils.GetPauseImageName(), framework.GetGPUDevicePluginImage(), + "gcr.io/kubernetes-e2e-test-images/node-perf/npb-is-amd64:1.0", + "gcr.io/kubernetes-e2e-test-images/node-perf/npb-ep-amd64:1.0", + "gcr.io/kubernetes-e2e-test-images/node-perf/tf-wide-deep-amd64:1.0", ) func init() { diff --git a/test/e2e_node/node_perf_test.go b/test/e2e_node/node_perf_test.go new file mode 100644 index 0000000000..f4b602a332 --- /dev/null +++ b/test/e2e_node/node_perf_test.go @@ -0,0 +1,108 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e_node + +import ( + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/e2e_node/perf/workloads" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +// makeNodePerfPod returns a pod with the information provided from the workload. +func makeNodePerfPod(w workloads.NodePerfWorkload) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-pod", w.Name()), + }, + Spec: w.PodSpec(), + } +} + +func setKubeletConfig(f *framework.Framework, cfg *kubeletconfig.KubeletConfiguration) { + if cfg != nil { + framework.ExpectNoError(setKubeletConfiguration(f, cfg)) + } + + // Wait for the Kubelet to be ready. + Eventually(func() bool { + nodeList := framework.GetReadySchedulableNodesOrDie(f.ClientSet) + return len(nodeList.Items) == 1 + }, time.Minute, time.Second).Should(BeTrue()) +} + +// Serial because the test updates kubelet configuration. +// Slow by design. +var _ = SIGDescribe("Node Performance Testing [Serial] [Slow]", func() { + f := framework.NewDefaultFramework("node-performance-testing") + + Context("Run node performance testing with pre-defined workloads", func() { + It("run each pre-defined workload", func() { + By("running the workloads") + for _, workload := range workloads.NodePerfWorkloads { + By("running the pre test exec from the workload") + err := workload.PreTestExec() + framework.ExpectNoError(err) + + By("restarting kubelet with required configuration") + // Get the Kubelet config required for this workload. + oldCfg, err := getCurrentKubeletConfig() + framework.ExpectNoError(err) + + newCfg, err := workload.KubeletConfig(oldCfg) + framework.ExpectNoError(err) + // Set the Kubelet config required for this workload. + setKubeletConfig(f, newCfg) + + By("running the workload and waiting for success") + // Make the pod for the workload. + pod := makeNodePerfPod(workload) + + // Create the pod. + pod = f.PodClient().CreateSync(pod) + // Wait for pod success. + f.PodClient().WaitForSuccess(pod.Name, workload.Timeout()) + podLogs, err := framework.GetPodLogs(f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.Containers[0].Name) + framework.ExpectNoError(err) + perf, err := workload.ExtractPerformanceFromLogs(podLogs) + framework.ExpectNoError(err) + framework.Logf("Time to complete workload %s: %v", workload.Name(), perf) + + // Delete the pod. + gp := int64(0) + delOpts := metav1.DeleteOptions{ + GracePeriodSeconds: &gp, + } + f.PodClient().DeleteSync(pod.Name, &delOpts, framework.DefaultPodDeletionTimeout) + + By("running the post test exec from the workload") + err = workload.PostTestExec() + framework.ExpectNoError(err) + + // Set the Kubelet config back to the old one. + setKubeletConfig(f, oldCfg) + } + }) + }) +}) diff --git a/test/e2e_node/perf/workloads/BUILD b/test/e2e_node/perf/workloads/BUILD new file mode 100644 index 0000000000..7eff2d4299 --- /dev/null +++ b/test/e2e_node/perf/workloads/BUILD @@ -0,0 +1,35 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "npb_ep.go", + "npb_is.go", + "tf_wide_deep.go", + "utils.go", + "workloads.go", + ], + importpath = "k8s.io/kubernetes/test/e2e_node/perf/workloads", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kubelet/apis/config:go_default_library", + "//pkg/kubelet/cm/cpumanager:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/test/e2e_node/perf/workloads/OWNERS b/test/e2e_node/perf/workloads/OWNERS new file mode 100644 index 0000000000..541219022b --- /dev/null +++ b/test/e2e_node/perf/workloads/OWNERS @@ -0,0 +1,7 @@ +approvers: +- vishh +- derekwaynecarr +- balajismaniam +- ConnorDoyle +reviewers: +- sig-node-reviewers diff --git a/test/e2e_node/perf/workloads/npb_ep.go b/test/e2e_node/perf/workloads/npb_ep.go new file mode 100644 index 0000000000..8478f52b45 --- /dev/null +++ b/test/e2e_node/perf/workloads/npb_ep.go @@ -0,0 +1,120 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workloads + +import ( + "fmt" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" +) + +// npbEPWorkload defines a workload to run the Embarrassingly Parallel (EP) workload +// from NAS parallel benchmark (NPB) suite. +type npbEPWorkload struct{} + +// Ensure npbEPWorkload implemets NodePerfWorkload interface. +var _ NodePerfWorkload = &npbEPWorkload{} + +func (w npbEPWorkload) Name() string { + return "npb-ep" +} + +func (w npbEPWorkload) PodSpec() corev1.PodSpec { + var containers []corev1.Container + ctn := corev1.Container{ + Name: fmt.Sprintf("%s-ctn", w.Name()), + Image: "gcr.io/kubernetes-e2e-test-images/node-perf/npb-ep-amd64:1.0", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceName(corev1.ResourceCPU): resource.MustParse("15000m"), + corev1.ResourceName(corev1.ResourceMemory): resource.MustParse("48Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceName(corev1.ResourceCPU): resource.MustParse("15000m"), + corev1.ResourceName(corev1.ResourceMemory): resource.MustParse("48Gi"), + }, + }, + Command: []string{"/bin/sh"}, + Args: []string{"-c", "/ep.D.x"}, + } + containers = append(containers, ctn) + + return corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: containers, + } +} + +func (w npbEPWorkload) Timeout() time.Duration { + return 10 * time.Minute +} + +func (w npbEPWorkload) KubeletConfig(oldCfg *kubeletconfig.KubeletConfiguration) (newCfg *kubeletconfig.KubeletConfiguration, err error) { + // Enable CPU Manager in Kubelet with static policy. + newCfg = oldCfg.DeepCopy() + // Set the CPU Manager policy to static. + newCfg.CPUManagerPolicy = string(cpumanager.PolicyStatic) + // Set the CPU Manager reconcile period to 10 second. + newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 10 * time.Second} + + // The Kubelet panics if either kube-reserved or system-reserved is not set + // when static CPU Manager is enabled. Set cpu in kube-reserved > 0 so that + // kubelet doesn't panic. + if newCfg.KubeReserved == nil { + newCfg.KubeReserved = map[string]string{} + } + + if _, ok := newCfg.KubeReserved["cpu"]; !ok { + newCfg.KubeReserved["cpu"] = "200m" + } + + return newCfg, nil +} + +func (w npbEPWorkload) PreTestExec() error { + cmd := "/bin/sh" + args := []string{"-c", "rm -f /var/lib/kubelet/cpu_manager_state"} + err := runCmd(cmd, args) + + return err +} + +func (w npbEPWorkload) PostTestExec() error { + cmd := "/bin/sh" + args := []string{"-c", "rm -f /var/lib/kubelet/cpu_manager_state"} + err := runCmd(cmd, args) + + return err +} + +func (w npbEPWorkload) ExtractPerformanceFromLogs(logs string) (perf time.Duration, err error) { + perfLine, err := getMatchingLineFromLog(logs, "Time in seconds =") + if err != nil { + return perf, err + } + perfStrings := strings.Split(perfLine, "=") + perfString := fmt.Sprintf("%ss", strings.TrimSpace(perfStrings[1])) + perf, err = time.ParseDuration(perfString) + + return perf, err +} diff --git a/test/e2e_node/perf/workloads/npb_is.go b/test/e2e_node/perf/workloads/npb_is.go new file mode 100644 index 0000000000..a7ee986360 --- /dev/null +++ b/test/e2e_node/perf/workloads/npb_is.go @@ -0,0 +1,92 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workloads + +import ( + "fmt" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" +) + +// npbISWorkload defines a workload to run the integer sort (IS) workload +// from NAS parallel benchmark (NPB) suite. +type npbISWorkload struct{} + +// Ensure npbISWorkload implemets NodePerfWorkload interface. +var _ NodePerfWorkload = &npbISWorkload{} + +func (w npbISWorkload) Name() string { + return "npb-is" +} + +func (w npbISWorkload) PodSpec() corev1.PodSpec { + var containers []corev1.Container + ctn := corev1.Container{ + Name: fmt.Sprintf("%s-ctn", w.Name()), + Image: "gcr.io/kubernetes-e2e-test-images/node-perf/npb-is-amd64:1.0", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceName(corev1.ResourceCPU): resource.MustParse("16000m"), + corev1.ResourceName(corev1.ResourceMemory): resource.MustParse("48Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceName(corev1.ResourceCPU): resource.MustParse("16000m"), + corev1.ResourceName(corev1.ResourceMemory): resource.MustParse("48Gi"), + }, + }, + Command: []string{"/bin/sh"}, + Args: []string{"-c", "/is.D.x"}, + } + containers = append(containers, ctn) + + return corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: containers, + } +} + +func (w npbISWorkload) Timeout() time.Duration { + return 4 * time.Minute +} + +func (w npbISWorkload) KubeletConfig(oldCfg *kubeletconfig.KubeletConfiguration) (newCfg *kubeletconfig.KubeletConfiguration, err error) { + return oldCfg, nil +} + +func (w npbISWorkload) PreTestExec() error { + return nil +} + +func (w npbISWorkload) PostTestExec() error { + return nil +} + +func (w npbISWorkload) ExtractPerformanceFromLogs(logs string) (perf time.Duration, err error) { + perfLine, err := getMatchingLineFromLog(logs, "Time in seconds =") + if err != nil { + return perf, err + } + perfStrings := strings.Split(perfLine, "=") + perfString := fmt.Sprintf("%ss", strings.TrimSpace(perfStrings[1])) + perf, err = time.ParseDuration(perfString) + + return perf, err +} diff --git a/test/e2e_node/perf/workloads/tf_wide_deep.go b/test/e2e_node/perf/workloads/tf_wide_deep.go new file mode 100644 index 0000000000..1e3540b77a --- /dev/null +++ b/test/e2e_node/perf/workloads/tf_wide_deep.go @@ -0,0 +1,119 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workloads + +import ( + "fmt" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" +) + +// tfWideDeepWorkload defines a workload to run +// https://github.com/tensorflow/models/tree/master/official/wide_deep. +type tfWideDeepWorkload struct{} + +// Ensure tfWideDeepWorkload implemets NodePerfWorkload interface. +var _ NodePerfWorkload = &tfWideDeepWorkload{} + +func (w tfWideDeepWorkload) Name() string { + return "tensorflow-wide-deep" +} + +func (w tfWideDeepWorkload) PodSpec() corev1.PodSpec { + var containers []corev1.Container + ctn := corev1.Container{ + Name: fmt.Sprintf("%s-ctn", w.Name()), + Image: "gcr.io/kubernetes-e2e-test-images/node-perf/tf-wide-deep-amd64:1.0", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceName(corev1.ResourceCPU): resource.MustParse("15000m"), + corev1.ResourceName(corev1.ResourceMemory): resource.MustParse("16Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceName(corev1.ResourceCPU): resource.MustParse("15000m"), + corev1.ResourceName(corev1.ResourceMemory): resource.MustParse("16Gi"), + }, + }, + Command: []string{"/bin/sh"}, + Args: []string{"-c", "python ./data_download.py && time -p python ./wide_deep.py --model_type=wide_deep --train_epochs=300 --epochs_between_evals=300 --batch_size=32561"}, + } + containers = append(containers, ctn) + + return corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: containers, + } +} + +func (w tfWideDeepWorkload) Timeout() time.Duration { + return 15 * time.Minute +} + +func (w tfWideDeepWorkload) KubeletConfig(oldCfg *kubeletconfig.KubeletConfiguration) (newCfg *kubeletconfig.KubeletConfiguration, err error) { + // Enable CPU Manager in Kubelet with static policy. + newCfg = oldCfg.DeepCopy() + // Set the CPU Manager policy to static. + newCfg.CPUManagerPolicy = string(cpumanager.PolicyStatic) + // Set the CPU Manager reconcile period to 10 second. + newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 10 * time.Second} + + // The Kubelet panics if either kube-reserved or system-reserved is not set + // when static CPU Manager is enabled. Set cpu in kube-reserved > 0 so that + // kubelet doesn't panic. + if newCfg.KubeReserved == nil { + newCfg.KubeReserved = map[string]string{} + } + + if _, ok := newCfg.KubeReserved["cpu"]; !ok { + newCfg.KubeReserved["cpu"] = "200m" + } + + return newCfg, nil +} + +func (w tfWideDeepWorkload) PreTestExec() error { + cmd := "/bin/sh" + args := []string{"-c", "rm -f /var/lib/kubelet/cpu_manager_state"} + err := runCmd(cmd, args) + + return err +} + +func (w tfWideDeepWorkload) PostTestExec() error { + cmd := "/bin/sh" + args := []string{"-c", "rm -f /var/lib/kubelet/cpu_manager_state"} + err := runCmd(cmd, args) + + return err +} + +func (w tfWideDeepWorkload) ExtractPerformanceFromLogs(logs string) (perf time.Duration, err error) { + perfLine, err := getMatchingLineFromLog(logs, "real") + if err != nil { + return perf, err + } + perfString := fmt.Sprintf("%ss", strings.TrimSpace(strings.TrimPrefix(perfLine, "real"))) + perf, err = time.ParseDuration(perfString) + + return perf, err +} diff --git a/test/e2e_node/perf/workloads/utils.go b/test/e2e_node/perf/workloads/utils.go new file mode 100644 index 0000000000..3e670d034f --- /dev/null +++ b/test/e2e_node/perf/workloads/utils.go @@ -0,0 +1,45 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workloads + +import ( + "fmt" + "os/exec" + "regexp" + "strings" +) + +func runCmd(cmd string, args []string) error { + err := exec.Command(cmd, args...).Run() + return err +} + +func getMatchingLineFromLog(log string, pattern string) (line string, err error) { + regex, err := regexp.Compile(pattern) + if err != nil { + return line, fmt.Errorf("failed to compile regexp %v: %v", pattern, err) + } + + logLines := strings.Split(log, "\n") + for _, line := range logLines { + if regex.MatchString(line) { + return line, nil + } + } + + return line, fmt.Errorf("line with pattern %v not found in log", pattern) +} diff --git a/test/e2e_node/perf/workloads/workloads.go b/test/e2e_node/perf/workloads/workloads.go new file mode 100644 index 0000000000..f8f2b6ed9e --- /dev/null +++ b/test/e2e_node/perf/workloads/workloads.go @@ -0,0 +1,53 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workloads + +import ( + "time" + + corev1 "k8s.io/api/core/v1" + kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" +) + +// NodePerfWorkload provides the necessary information to run a workload for +// node performance testing. +type NodePerfWorkload interface { + // Name of the workload. + Name() string + // PodSpec used to run this workload. + PodSpec() corev1.PodSpec + // Timeout provides the expected time to completion + // for this workload. + Timeout() time.Duration + // KubeletConfig specifies the Kubelet configuration + // required for this workload. + KubeletConfig(old *kubeletconfig.KubeletConfiguration) (new *kubeletconfig.KubeletConfiguration, err error) + // PreTestExec is used for defining logic that needs + // to be run before restarting the Kubelet with the new Kubelet + // configuration required for the workload. + PreTestExec() error + // PostTestExec is used for defining logic that needs + // to be run after the workload has completed. + PostTestExec() error + // ExtractPerformanceFromLogs is used get the performance of the workload + // from pod logs. Currently, we support only performance reported in + // time.Duration format. + ExtractPerformanceFromLogs(logs string) (perf time.Duration, err error) +} + +// NodePerfWorkloads is the collection of all node performance testing workloads. +var NodePerfWorkloads = []NodePerfWorkload{npbISWorkload{}, npbEPWorkload{}, tfWideDeepWorkload{}}