Merge pull request #65250 from balajismaniam/node-perf-testing-framework

Automatic merge from submit-queue (batch tested with PRs 65250, 68241). If you want to cherry-pick this change to another branch, please follow the instructions here: https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md.

Initial node performance testing framework.

This PR adds a framework for node performance testing. 
Partially fixes: https://github.com/kubernetes/kubernetes/issues/65249.
Use the following command to run this test:
```sh
make test-e2e-node FOCUS="Node Performance Testing" SKIP="" PARALLELISM=1
```
It has been tested in the following environment:
- n1-standard-16
- Ubuntu 16.04
- docker 17.03.2

Note to reviewers:
This PR won't pass node e2e since the docker images in https://github.com/kubernetes/kubernetes/pull/65251 are required for this to function. The node e2e will fail when trying to pull the required images for testing.
pull/8/head
Kubernetes Submit Queue 2018-09-08 16:09:30 -07:00 committed by GitHub
commit c9de610897
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 585 additions and 0 deletions

View File

@ -93,6 +93,7 @@ go_test(
"log_path_test.go",
"mirror_pod_test.go",
"node_container_manager_test.go",
"node_perf_test.go",
"pods_container_manager_test.go",
"resource_usage_test.go",
"restart_test.go",
@ -143,6 +144,7 @@ go_test(
"//test/e2e/common:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/metrics:go_default_library",
"//test/e2e_node/perf/workloads:go_default_library",
"//test/e2e_node/services:go_default_library",
"//test/utils/image:go_default_library",
"//vendor/github.com/blang/semver:go_default_library",
@ -193,6 +195,7 @@ filegroup(
":package-srcs",
"//test/e2e_node/builder:all-srcs",
"//test/e2e_node/environment:all-srcs",
"//test/e2e_node/perf/workloads:all-srcs",
"//test/e2e_node/perftype:all-srcs",
"//test/e2e_node/remote:all-srcs",
"//test/e2e_node/runner/local:all-srcs",

View File

@ -53,6 +53,9 @@ var NodeImageWhiteList = sets.NewString(
imageutils.GetE2EImage(imageutils.Nonewprivs),
imageutils.GetPauseImageName(),
framework.GetGPUDevicePluginImage(),
"gcr.io/kubernetes-e2e-test-images/node-perf/npb-is-amd64:1.0",
"gcr.io/kubernetes-e2e-test-images/node-perf/npb-ep-amd64:1.0",
"gcr.io/kubernetes-e2e-test-images/node-perf/tf-wide-deep-amd64:1.0",
)
func init() {

View File

@ -0,0 +1,108 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_node
import (
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e_node/perf/workloads"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
// makeNodePerfPod returns a pod with the information provided from the workload.
func makeNodePerfPod(w workloads.NodePerfWorkload) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-pod", w.Name()),
},
Spec: w.PodSpec(),
}
}
func setKubeletConfig(f *framework.Framework, cfg *kubeletconfig.KubeletConfiguration) {
if cfg != nil {
framework.ExpectNoError(setKubeletConfiguration(f, cfg))
}
// Wait for the Kubelet to be ready.
Eventually(func() bool {
nodeList := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
return len(nodeList.Items) == 1
}, time.Minute, time.Second).Should(BeTrue())
}
// Serial because the test updates kubelet configuration.
// Slow by design.
var _ = SIGDescribe("Node Performance Testing [Serial] [Slow]", func() {
f := framework.NewDefaultFramework("node-performance-testing")
Context("Run node performance testing with pre-defined workloads", func() {
It("run each pre-defined workload", func() {
By("running the workloads")
for _, workload := range workloads.NodePerfWorkloads {
By("running the pre test exec from the workload")
err := workload.PreTestExec()
framework.ExpectNoError(err)
By("restarting kubelet with required configuration")
// Get the Kubelet config required for this workload.
oldCfg, err := getCurrentKubeletConfig()
framework.ExpectNoError(err)
newCfg, err := workload.KubeletConfig(oldCfg)
framework.ExpectNoError(err)
// Set the Kubelet config required for this workload.
setKubeletConfig(f, newCfg)
By("running the workload and waiting for success")
// Make the pod for the workload.
pod := makeNodePerfPod(workload)
// Create the pod.
pod = f.PodClient().CreateSync(pod)
// Wait for pod success.
f.PodClient().WaitForSuccess(pod.Name, workload.Timeout())
podLogs, err := framework.GetPodLogs(f.ClientSet, f.Namespace.Name, pod.Name, pod.Spec.Containers[0].Name)
framework.ExpectNoError(err)
perf, err := workload.ExtractPerformanceFromLogs(podLogs)
framework.ExpectNoError(err)
framework.Logf("Time to complete workload %s: %v", workload.Name(), perf)
// Delete the pod.
gp := int64(0)
delOpts := metav1.DeleteOptions{
GracePeriodSeconds: &gp,
}
f.PodClient().DeleteSync(pod.Name, &delOpts, framework.DefaultPodDeletionTimeout)
By("running the post test exec from the workload")
err = workload.PostTestExec()
framework.ExpectNoError(err)
// Set the Kubelet config back to the old one.
setKubeletConfig(f, oldCfg)
}
})
})
})

View File

@ -0,0 +1,35 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"npb_ep.go",
"npb_is.go",
"tf_wide_deep.go",
"utils.go",
"workloads.go",
],
importpath = "k8s.io/kubernetes/test/e2e_node/perf/workloads",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/cm/cpumanager:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,7 @@
approvers:
- vishh
- derekwaynecarr
- balajismaniam
- ConnorDoyle
reviewers:
- sig-node-reviewers

View File

@ -0,0 +1,120 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloads
import (
"fmt"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
)
// npbEPWorkload defines a workload to run the Embarrassingly Parallel (EP) workload
// from NAS parallel benchmark (NPB) suite.
type npbEPWorkload struct{}
// Ensure npbEPWorkload implemets NodePerfWorkload interface.
var _ NodePerfWorkload = &npbEPWorkload{}
func (w npbEPWorkload) Name() string {
return "npb-ep"
}
func (w npbEPWorkload) PodSpec() corev1.PodSpec {
var containers []corev1.Container
ctn := corev1.Container{
Name: fmt.Sprintf("%s-ctn", w.Name()),
Image: "gcr.io/kubernetes-e2e-test-images/node-perf/npb-ep-amd64:1.0",
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName(corev1.ResourceCPU): resource.MustParse("15000m"),
corev1.ResourceName(corev1.ResourceMemory): resource.MustParse("48Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceName(corev1.ResourceCPU): resource.MustParse("15000m"),
corev1.ResourceName(corev1.ResourceMemory): resource.MustParse("48Gi"),
},
},
Command: []string{"/bin/sh"},
Args: []string{"-c", "/ep.D.x"},
}
containers = append(containers, ctn)
return corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
Containers: containers,
}
}
func (w npbEPWorkload) Timeout() time.Duration {
return 10 * time.Minute
}
func (w npbEPWorkload) KubeletConfig(oldCfg *kubeletconfig.KubeletConfiguration) (newCfg *kubeletconfig.KubeletConfiguration, err error) {
// Enable CPU Manager in Kubelet with static policy.
newCfg = oldCfg.DeepCopy()
// Set the CPU Manager policy to static.
newCfg.CPUManagerPolicy = string(cpumanager.PolicyStatic)
// Set the CPU Manager reconcile period to 10 second.
newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 10 * time.Second}
// The Kubelet panics if either kube-reserved or system-reserved is not set
// when static CPU Manager is enabled. Set cpu in kube-reserved > 0 so that
// kubelet doesn't panic.
if newCfg.KubeReserved == nil {
newCfg.KubeReserved = map[string]string{}
}
if _, ok := newCfg.KubeReserved["cpu"]; !ok {
newCfg.KubeReserved["cpu"] = "200m"
}
return newCfg, nil
}
func (w npbEPWorkload) PreTestExec() error {
cmd := "/bin/sh"
args := []string{"-c", "rm -f /var/lib/kubelet/cpu_manager_state"}
err := runCmd(cmd, args)
return err
}
func (w npbEPWorkload) PostTestExec() error {
cmd := "/bin/sh"
args := []string{"-c", "rm -f /var/lib/kubelet/cpu_manager_state"}
err := runCmd(cmd, args)
return err
}
func (w npbEPWorkload) ExtractPerformanceFromLogs(logs string) (perf time.Duration, err error) {
perfLine, err := getMatchingLineFromLog(logs, "Time in seconds =")
if err != nil {
return perf, err
}
perfStrings := strings.Split(perfLine, "=")
perfString := fmt.Sprintf("%ss", strings.TrimSpace(perfStrings[1]))
perf, err = time.ParseDuration(perfString)
return perf, err
}

View File

@ -0,0 +1,92 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloads
import (
"fmt"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
)
// npbISWorkload defines a workload to run the integer sort (IS) workload
// from NAS parallel benchmark (NPB) suite.
type npbISWorkload struct{}
// Ensure npbISWorkload implemets NodePerfWorkload interface.
var _ NodePerfWorkload = &npbISWorkload{}
func (w npbISWorkload) Name() string {
return "npb-is"
}
func (w npbISWorkload) PodSpec() corev1.PodSpec {
var containers []corev1.Container
ctn := corev1.Container{
Name: fmt.Sprintf("%s-ctn", w.Name()),
Image: "gcr.io/kubernetes-e2e-test-images/node-perf/npb-is-amd64:1.0",
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName(corev1.ResourceCPU): resource.MustParse("16000m"),
corev1.ResourceName(corev1.ResourceMemory): resource.MustParse("48Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceName(corev1.ResourceCPU): resource.MustParse("16000m"),
corev1.ResourceName(corev1.ResourceMemory): resource.MustParse("48Gi"),
},
},
Command: []string{"/bin/sh"},
Args: []string{"-c", "/is.D.x"},
}
containers = append(containers, ctn)
return corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
Containers: containers,
}
}
func (w npbISWorkload) Timeout() time.Duration {
return 4 * time.Minute
}
func (w npbISWorkload) KubeletConfig(oldCfg *kubeletconfig.KubeletConfiguration) (newCfg *kubeletconfig.KubeletConfiguration, err error) {
return oldCfg, nil
}
func (w npbISWorkload) PreTestExec() error {
return nil
}
func (w npbISWorkload) PostTestExec() error {
return nil
}
func (w npbISWorkload) ExtractPerformanceFromLogs(logs string) (perf time.Duration, err error) {
perfLine, err := getMatchingLineFromLog(logs, "Time in seconds =")
if err != nil {
return perf, err
}
perfStrings := strings.Split(perfLine, "=")
perfString := fmt.Sprintf("%ss", strings.TrimSpace(perfStrings[1]))
perf, err = time.ParseDuration(perfString)
return perf, err
}

View File

@ -0,0 +1,119 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloads
import (
"fmt"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
)
// tfWideDeepWorkload defines a workload to run
// https://github.com/tensorflow/models/tree/master/official/wide_deep.
type tfWideDeepWorkload struct{}
// Ensure tfWideDeepWorkload implemets NodePerfWorkload interface.
var _ NodePerfWorkload = &tfWideDeepWorkload{}
func (w tfWideDeepWorkload) Name() string {
return "tensorflow-wide-deep"
}
func (w tfWideDeepWorkload) PodSpec() corev1.PodSpec {
var containers []corev1.Container
ctn := corev1.Container{
Name: fmt.Sprintf("%s-ctn", w.Name()),
Image: "gcr.io/kubernetes-e2e-test-images/node-perf/tf-wide-deep-amd64:1.0",
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName(corev1.ResourceCPU): resource.MustParse("15000m"),
corev1.ResourceName(corev1.ResourceMemory): resource.MustParse("16Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceName(corev1.ResourceCPU): resource.MustParse("15000m"),
corev1.ResourceName(corev1.ResourceMemory): resource.MustParse("16Gi"),
},
},
Command: []string{"/bin/sh"},
Args: []string{"-c", "python ./data_download.py && time -p python ./wide_deep.py --model_type=wide_deep --train_epochs=300 --epochs_between_evals=300 --batch_size=32561"},
}
containers = append(containers, ctn)
return corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
Containers: containers,
}
}
func (w tfWideDeepWorkload) Timeout() time.Duration {
return 15 * time.Minute
}
func (w tfWideDeepWorkload) KubeletConfig(oldCfg *kubeletconfig.KubeletConfiguration) (newCfg *kubeletconfig.KubeletConfiguration, err error) {
// Enable CPU Manager in Kubelet with static policy.
newCfg = oldCfg.DeepCopy()
// Set the CPU Manager policy to static.
newCfg.CPUManagerPolicy = string(cpumanager.PolicyStatic)
// Set the CPU Manager reconcile period to 10 second.
newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 10 * time.Second}
// The Kubelet panics if either kube-reserved or system-reserved is not set
// when static CPU Manager is enabled. Set cpu in kube-reserved > 0 so that
// kubelet doesn't panic.
if newCfg.KubeReserved == nil {
newCfg.KubeReserved = map[string]string{}
}
if _, ok := newCfg.KubeReserved["cpu"]; !ok {
newCfg.KubeReserved["cpu"] = "200m"
}
return newCfg, nil
}
func (w tfWideDeepWorkload) PreTestExec() error {
cmd := "/bin/sh"
args := []string{"-c", "rm -f /var/lib/kubelet/cpu_manager_state"}
err := runCmd(cmd, args)
return err
}
func (w tfWideDeepWorkload) PostTestExec() error {
cmd := "/bin/sh"
args := []string{"-c", "rm -f /var/lib/kubelet/cpu_manager_state"}
err := runCmd(cmd, args)
return err
}
func (w tfWideDeepWorkload) ExtractPerformanceFromLogs(logs string) (perf time.Duration, err error) {
perfLine, err := getMatchingLineFromLog(logs, "real")
if err != nil {
return perf, err
}
perfString := fmt.Sprintf("%ss", strings.TrimSpace(strings.TrimPrefix(perfLine, "real")))
perf, err = time.ParseDuration(perfString)
return perf, err
}

View File

@ -0,0 +1,45 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloads
import (
"fmt"
"os/exec"
"regexp"
"strings"
)
func runCmd(cmd string, args []string) error {
err := exec.Command(cmd, args...).Run()
return err
}
func getMatchingLineFromLog(log string, pattern string) (line string, err error) {
regex, err := regexp.Compile(pattern)
if err != nil {
return line, fmt.Errorf("failed to compile regexp %v: %v", pattern, err)
}
logLines := strings.Split(log, "\n")
for _, line := range logLines {
if regex.MatchString(line) {
return line, nil
}
}
return line, fmt.Errorf("line with pattern %v not found in log", pattern)
}

View File

@ -0,0 +1,53 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloads
import (
"time"
corev1 "k8s.io/api/core/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
)
// NodePerfWorkload provides the necessary information to run a workload for
// node performance testing.
type NodePerfWorkload interface {
// Name of the workload.
Name() string
// PodSpec used to run this workload.
PodSpec() corev1.PodSpec
// Timeout provides the expected time to completion
// for this workload.
Timeout() time.Duration
// KubeletConfig specifies the Kubelet configuration
// required for this workload.
KubeletConfig(old *kubeletconfig.KubeletConfiguration) (new *kubeletconfig.KubeletConfiguration, err error)
// PreTestExec is used for defining logic that needs
// to be run before restarting the Kubelet with the new Kubelet
// configuration required for the workload.
PreTestExec() error
// PostTestExec is used for defining logic that needs
// to be run after the workload has completed.
PostTestExec() error
// ExtractPerformanceFromLogs is used get the performance of the workload
// from pod logs. Currently, we support only performance reported in
// time.Duration format.
ExtractPerformanceFromLogs(logs string) (perf time.Duration, err error)
}
// NodePerfWorkloads is the collection of all node performance testing workloads.
var NodePerfWorkloads = []NodePerfWorkload{npbISWorkload{}, npbEPWorkload{}, tfWideDeepWorkload{}}