diff --git a/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go b/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go index 01f08c1598..a04389cc19 100644 --- a/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go +++ b/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go @@ -38,6 +38,18 @@ type Stub struct { update chan []*pluginapi.Device server *grpc.Server + + // allocFunc is used for handling allocation request + allocFunc stubAllocFunc +} + +// stubAllocFunc is the function called when receive an allocation request from Kubelet +type stubAllocFunc func(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error) + +func defaultAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error) { + var response pluginapi.AllocateResponse + + return &response, nil } // NewDevicePluginStub returns an initialized DevicePlugin Stub. @@ -48,9 +60,16 @@ func NewDevicePluginStub(devs []*pluginapi.Device, socket string) *Stub { stop: make(chan interface{}), update: make(chan []*pluginapi.Device), + + allocFunc: defaultAllocFunc, } } +// SetAllocFunc sets allocFunc of the device plugin +func (m *Stub) SetAllocFunc(f stubAllocFunc) { + m.allocFunc = f +} + // Start starts the gRPC server of the device plugin func (m *Stub) Start() error { err := m.cleanup() @@ -145,8 +164,13 @@ func (m *Stub) Update(devs []*pluginapi.Device) { func (m *Stub) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { log.Printf("Allocate, %+v", r) - var response pluginapi.AllocateResponse - return &response, nil + devs := make(map[string]pluginapi.Device) + + for _, dev := range m.devs { + devs[dev.ID] = *dev + } + + return m.allocFunc(r, devs) } func (m *Stub) cleanup() error { diff --git a/test/e2e_node/BUILD b/test/e2e_node/BUILD index e70755c536..9ee141f009 100644 --- a/test/e2e_node/BUILD +++ b/test/e2e_node/BUILD @@ -10,6 +10,7 @@ go_library( name = "go_default_library", srcs = [ "container.go", + "device_plugin.go", "doc.go", "docker_util.go", "framework.go", @@ -32,10 +33,12 @@ go_library( "//pkg/features:go_default_library", "//pkg/kubelet/apis/cri:go_default_library", "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", + "//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library", "//pkg/kubelet/apis/kubeletconfig/v1alpha1:go_default_library", "//pkg/kubelet/apis/stats/v1alpha1:go_default_library", + "//pkg/kubelet/cm/deviceplugin:go_default_library", "//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/remote:go_default_library", "//test/e2e/common:go_default_library", diff --git a/test/e2e_node/device_plugin.go b/test/e2e_node/device_plugin.go new file mode 100644 index 0000000000..826d3b6698 --- /dev/null +++ b/test/e2e_node/device_plugin.go @@ -0,0 +1,262 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e_node + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "regexp" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/uuid" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" + "k8s.io/kubernetes/test/e2e/framework" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" + dp "k8s.io/kubernetes/pkg/kubelet/cm/deviceplugin" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +const ( + // fake resource name + resourceName = "fake.com/resource" +) + +// Serial because the test restarts Kubelet +var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin] [Serial] [Disruptive]", func() { + f := framework.NewDefaultFramework("device-plugin-errors") + + Context("DevicePlugin", func() { + By("Enabling support for Device Plugin") + tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) { + initialConfig.FeatureGates[string(features.DevicePlugins)] = true + }) + + It("Verifies the Kubelet device plugin functionality.", func() { + + By("Wait for node is ready") + framework.WaitForAllNodesSchedulable(f.ClientSet, framework.TestContext.NodeSchedulableTimeout) + + By("Start stub device plugin") + // fake devices for e2e test + devs := []*pluginapi.Device{ + {ID: "Dev-1", Health: pluginapi.Healthy}, + {ID: "Dev-2", Health: pluginapi.Healthy}, + } + + socketPath := pluginapi.DevicePluginPath + "dp." + fmt.Sprintf("%d", time.Now().Unix()) + + dp1 := dp.NewDevicePluginStub(devs, socketPath) + dp1.SetAllocFunc(stubAllocFunc) + err := dp1.Start() + framework.ExpectNoError(err) + + By("Register resources") + err = dp1.Register(pluginapi.KubeletSocket, resourceName) + framework.ExpectNoError(err) + + By("Waiting for the resource exported by the stub device plugin to become available on the local node") + devsLen := int64(len(devs)) + Eventually(func() int64 { + node, err := f.ClientSet.CoreV1().Nodes().Get(framework.TestContext.NodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + return numberOfDevices(node, resourceName) + }, 30*time.Second, framework.Poll).Should(Equal(devsLen)) + + By("Creating one pod on node with at least one fake-device") + podRECMD := "devs=$(ls /tmp/ | egrep '^Dev-[0-9]+$') && echo stub devices: $devs" + pod1 := f.PodClient().CreateSync(makeBusyboxPod(resourceName, podRECMD)) + deviceIDRE := "stub devices: (Dev-[0-9]+)" + count1, devId1 := parseLogFromNRuns(f, pod1.Name, pod1.Name, 0, deviceIDRE) + Expect(devId1).To(Not(Equal(""))) + + pod1, err = f.PodClient().Get(pod1.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + By("Restarting Kubelet and waiting for the current running pod to restart") + restartKubelet() + + By("Confirming that after a kubelet and pod restart, fake-device assignement is kept") + count1, devIdRestart1 := parseLogFromNRuns(f, pod1.Name, pod1.Name, count1+1, deviceIDRE) + Expect(devIdRestart1).To(Equal(devId1)) + + By("Wait for node is ready") + framework.WaitForAllNodesSchedulable(f.ClientSet, framework.TestContext.NodeSchedulableTimeout) + + By("Re-Register resources") + dp1 = dp.NewDevicePluginStub(devs, socketPath) + dp1.SetAllocFunc(stubAllocFunc) + err = dp1.Start() + framework.ExpectNoError(err) + + err = dp1.Register(pluginapi.KubeletSocket, resourceName) + framework.ExpectNoError(err) + + By("Waiting for resource to become available on the local node after re-registration") + Eventually(func() int64 { + node, err := f.ClientSet.CoreV1().Nodes().Get(framework.TestContext.NodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + return numberOfDevices(node, resourceName) + }, 30*time.Second, framework.Poll).Should(Equal(devsLen)) + + By("Creating another pod") + pod2 := f.PodClient().CreateSync(makeBusyboxPod(resourceName, podRECMD)) + + By("Checking that pods got a different GPU") + count2, devId2 := parseLogFromNRuns(f, pod2.Name, pod2.Name, 1, deviceIDRE) + + Expect(devId1).To(Not(Equal(devId2))) + + By("Deleting device plugin.") + err = dp1.Stop() + framework.ExpectNoError(err) + + By("Waiting for stub device plugin to become unavailable on the local node") + Eventually(func() bool { + node, err := f.ClientSet.CoreV1().Nodes().Get(framework.TestContext.NodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + return numberOfDevices(node, resourceName) <= 0 + }, 10*time.Minute, framework.Poll).Should(BeTrue()) + + By("Checking that scheduled pods can continue to run even after we delete device plugin.") + count1, devIdRestart1 = parseLogFromNRuns(f, pod1.Name, pod1.Name, count1+1, deviceIDRE) + Expect(devIdRestart1).To(Equal(devId1)) + count2, devIdRestart2 := parseLogFromNRuns(f, pod2.Name, pod2.Name, count2+1, deviceIDRE) + Expect(devIdRestart2).To(Equal(devId2)) + + By("Restarting Kubelet.") + restartKubelet() + + By("Checking that scheduled pods can continue to run even after we delete device plugin and restart Kubelet.") + count1, devIdRestart1 = parseLogFromNRuns(f, pod1.Name, pod1.Name, count1+2, deviceIDRE) + Expect(devIdRestart1).To(Equal(devId1)) + count2, devIdRestart2 = parseLogFromNRuns(f, pod2.Name, pod2.Name, count2+2, deviceIDRE) + Expect(devIdRestart2).To(Equal(devId2)) + + // Cleanup + f.PodClient().DeleteSync(pod1.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) + f.PodClient().DeleteSync(pod2.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) + }) + }) +}) + +// makeBusyboxPod returns a simple Pod spec with a busybox container +// that requests resourceName and runs the specified command. +func makeBusyboxPod(resourceName, cmd string) *v1.Pod { + podName := "device-plugin-test-" + string(uuid.NewUUID()) + rl := v1.ResourceList{v1.ResourceName(resourceName): *resource.NewQuantity(1, resource.DecimalSI)} + + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: podName}, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyAlways, + Containers: []v1.Container{{ + Image: busyboxImage, + Name: podName, + // Runs the specified command in the test pod. + Command: []string{"sh", "-c", cmd}, + Resources: v1.ResourceRequirements{ + Limits: rl, + Requests: rl, + }, + }}, + }, + } +} + +// parseLogFromNRuns returns restart count of the specified container +// after it has been restarted at least restartCount times, +// and the matching string for the specified regular expression parsed from the container logs. +func parseLogFromNRuns(f *framework.Framework, podName string, contName string, restartCount int32, re string) (int32, string) { + var count int32 + // Wait till pod has been restarted at least restartCount times. + Eventually(func() bool { + p, err := f.PodClient().Get(podName, metav1.GetOptions{}) + if err != nil || len(p.Status.ContainerStatuses) < 1 { + return false + } + count = p.Status.ContainerStatuses[0].RestartCount + return count >= restartCount + }, 5*time.Minute, framework.Poll).Should(BeTrue()) + + logs, err := framework.GetPodLogs(f.ClientSet, f.Namespace.Name, podName, contName) + if err != nil { + framework.Failf("GetPodLogs for pod %q failed: %v", podName, err) + } + + framework.Logf("got pod logs: %v", logs) + regex := regexp.MustCompile(re) + matches := regex.FindStringSubmatch(logs) + if len(matches) < 2 { + return count, "" + } + + return count, matches[1] +} + +// numberOfDevices returns the number of devices of resourceName advertised by a node +func numberOfDevices(node *v1.Node, resourceName string) int64 { + val, ok := node.Status.Capacity[v1.ResourceName(resourceName)] + if !ok { + return 0 + } + + return val.Value() +} + +// stubAllocFunc will pass to stub device plugin +func stubAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error) { + var response pluginapi.AllocateResponse + for _, requestID := range r.DevicesIDs { + dev, ok := devs[requestID] + if !ok { + return nil, fmt.Errorf("invalid allocation request with non-existing device %s", requestID) + } + + if dev.Health != pluginapi.Healthy { + return nil, fmt.Errorf("invalid allocation request with unhealthy device: %s", requestID) + } + + // create fake device file + fpath := filepath.Join("/tmp", dev.ID) + + // clean first + os.RemoveAll(fpath) + f, err := os.Create(fpath) + if err != nil && !os.IsExist(err) { + return nil, fmt.Errorf("failed to create fake device file: %s", err) + } + + f.Close() + + response.Mounts = append(response.Mounts, &pluginapi.Mount{ + ContainerPath: fpath, + HostPath: fpath, + }) + } + + return &response, nil +} diff --git a/test/e2e_node/gpu_device_plugin.go b/test/e2e_node/gpu_device_plugin.go index d2a52c3749..256a8935c5 100644 --- a/test/e2e_node/gpu_device_plugin.go +++ b/test/e2e_node/gpu_device_plugin.go @@ -17,15 +17,11 @@ limitations under the License. package e2e_node import ( - "os/exec" - "regexp" "strconv" "time" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics" @@ -89,24 +85,28 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi It("checks that when Kubelet restarts exclusive GPU assignation to pods is kept.", func() { By("Creating one GPU pod on a node with at least two GPUs") - p1 := f.PodClient().CreateSync(makeCudaPauseImage()) - count1, devId1 := getDeviceId(f, p1.Name, p1.Name, 1) + podRECMD := "devs=$(ls /dev/ | egrep '^nvidia[0-9]+$') && echo gpu devices: $devs" + p1 := f.PodClient().CreateSync(makeBusyboxPod(framework.NVIDIAGPUResourceName, podRECMD)) + + deviceIDRE := "gpu devices: (nvidia[0-9]+)" + count1, devId1 := parseLogFromNRuns(f, p1.Name, p1.Name, 1, deviceIDRE) p1, err := f.PodClient().Get(p1.Name, metav1.GetOptions{}) framework.ExpectNoError(err) By("Restarting Kubelet and waiting for the current running pod to restart") - restartKubelet(f) + restartKubelet() By("Confirming that after a kubelet and pod restart, GPU assignement is kept") - count1, devIdRestart1 := getDeviceId(f, p1.Name, p1.Name, count1+1) + count1, devIdRestart1 := parseLogFromNRuns(f, p1.Name, p1.Name, count1+1, deviceIDRE) Expect(devIdRestart1).To(Equal(devId1)) By("Restarting Kubelet and creating another pod") - restartKubelet(f) - p2 := f.PodClient().CreateSync(makeCudaPauseImage()) + restartKubelet() + p2 := f.PodClient().CreateSync(makeBusyboxPod(framework.NVIDIAGPUResourceName, podRECMD)) By("Checking that pods got a different GPU") - count2, devId2 := getDeviceId(f, p2.Name, p2.Name, 1) + count2, devId2 := parseLogFromNRuns(f, p2.Name, p2.Name, 1, deviceIDRE) + Expect(devId1).To(Not(Equal(devId2))) By("Deleting device plugin.") @@ -118,16 +118,16 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi return framework.NumberOfNVIDIAGPUs(node) <= 0 }, 10*time.Minute, framework.Poll).Should(BeTrue()) By("Checking that scheduled pods can continue to run even after we delete device plugin.") - count1, devIdRestart1 = getDeviceId(f, p1.Name, p1.Name, count1+1) + count1, devIdRestart1 = parseLogFromNRuns(f, p1.Name, p1.Name, count1+1, deviceIDRE) Expect(devIdRestart1).To(Equal(devId1)) - count2, devIdRestart2 := getDeviceId(f, p2.Name, p2.Name, count2+1) + count2, devIdRestart2 := parseLogFromNRuns(f, p2.Name, p2.Name, count2+1, deviceIDRE) Expect(devIdRestart2).To(Equal(devId2)) By("Restarting Kubelet.") - restartKubelet(f) + restartKubelet() By("Checking that scheduled pods can continue to run even after we delete device plugin and restart Kubelet.") - count1, devIdRestart1 = getDeviceId(f, p1.Name, p1.Name, count1+2) + count1, devIdRestart1 = parseLogFromNRuns(f, p1.Name, p1.Name, count1+2, deviceIDRE) Expect(devIdRestart1).To(Equal(devId1)) - count2, devIdRestart2 = getDeviceId(f, p2.Name, p2.Name, count2+2) + count2, devIdRestart2 = parseLogFromNRuns(f, p2.Name, p2.Name, count2+2, deviceIDRE) Expect(devIdRestart2).To(Equal(devId2)) logDevicePluginMetrics() @@ -165,68 +165,3 @@ func logDevicePluginMetrics() { } } } - -func makeCudaPauseImage() *v1.Pod { - podName := testPodNamePrefix + string(uuid.NewUUID()) - - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: podName}, - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyAlways, - Containers: []v1.Container{{ - Image: busyboxImage, - Name: podName, - // Retrieves the gpu devices created in the user pod. - // Note the nvidia device plugin implementation doesn't do device id remapping currently. - // Will probably need to use nvidia-smi if that changes. - Command: []string{"sh", "-c", "devs=$(ls /dev/ | egrep '^nvidia[0-9]+$') && echo gpu devices: $devs"}, - - Resources: v1.ResourceRequirements{ - Limits: newDecimalResourceList(framework.NVIDIAGPUResourceName, 1), - Requests: newDecimalResourceList(framework.NVIDIAGPUResourceName, 1), - }, - }}, - }, - } -} - -func newDecimalResourceList(name v1.ResourceName, quantity int64) v1.ResourceList { - return v1.ResourceList{name: *resource.NewQuantity(quantity, resource.DecimalSI)} -} - -// TODO: Find a uniform way to deal with systemctl/initctl/service operations. #34494 -func restartKubelet(f *framework.Framework) { - stdout, err := exec.Command("sudo", "systemctl", "list-units", "kubelet*", "--state=running").CombinedOutput() - framework.ExpectNoError(err) - regex := regexp.MustCompile("(kubelet-[0-9]+)") - matches := regex.FindStringSubmatch(string(stdout)) - Expect(len(matches)).NotTo(BeZero()) - kube := matches[0] - framework.Logf("Get running kubelet with systemctl: %v, %v", string(stdout), kube) - stdout, err = exec.Command("sudo", "systemctl", "restart", kube).CombinedOutput() - framework.ExpectNoError(err, "Failed to restart kubelet with systemctl: %v, %v", err, stdout) -} - -func getDeviceId(f *framework.Framework, podName string, contName string, restartCount int32) (int32, string) { - var count int32 - // Wait till pod has been restarted at least restartCount times. - Eventually(func() bool { - p, err := f.PodClient().Get(podName, metav1.GetOptions{}) - if err != nil || len(p.Status.ContainerStatuses) < 1 { - return false - } - count = p.Status.ContainerStatuses[0].RestartCount - return count >= restartCount - }, 5*time.Minute, framework.Poll).Should(BeTrue()) - logs, err := framework.GetPodLogs(f.ClientSet, f.Namespace.Name, podName, contName) - if err != nil { - framework.Failf("GetPodLogs for pod %q failed: %v", podName, err) - } - framework.Logf("got pod logs: %v", logs) - regex := regexp.MustCompile("gpu devices: (nvidia[0-9]+)") - matches := regex.FindStringSubmatch(logs) - if len(matches) < 2 { - return count, "" - } - return count, matches[1] -} diff --git a/test/e2e_node/util.go b/test/e2e_node/util.go index 1a6d08d281..bf4bc28020 100644 --- a/test/e2e_node/util.go +++ b/test/e2e_node/util.go @@ -24,6 +24,7 @@ import ( "net/http" "os/exec" "reflect" + "regexp" "strings" "time" @@ -393,3 +394,16 @@ func getCRIClient() (internalapi.RuntimeService, internalapi.ImageManagerService } return r, i, nil } + +// TODO: Find a uniform way to deal with systemctl/initctl/service operations. #34494 +func restartKubelet() { + stdout, err := exec.Command("sudo", "systemctl", "list-units", "kubelet*", "--state=running").CombinedOutput() + framework.ExpectNoError(err) + regex := regexp.MustCompile("(kubelet-[0-9]+)") + matches := regex.FindStringSubmatch(string(stdout)) + Expect(len(matches)).NotTo(BeZero()) + kube := matches[0] + framework.Logf("Get running kubelet with systemctl: %v, %v", string(stdout), kube) + stdout, err = exec.Command("sudo", "systemctl", "restart", kube).CombinedOutput() + framework.ExpectNoError(err, "Failed to restart kubelet with systemctl: %v, %v", err, stdout) +}