From 10995f661d0d025b6f1ed6ad0fb5c858b1b5ef76 Mon Sep 17 00:00:00 2001 From: ailusazh Date: Thu, 13 Sep 2018 18:19:59 +0800 Subject: [PATCH] clean containers in reconcileState of cpuManager --- pkg/kubelet/cm/cpumanager/cpu_manager.go | 14 ++++ test/e2e_node/BUILD | 1 + test/e2e_node/cpu_manager_test.go | 89 +++++++++++++++++++++++- 3 files changed, 101 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 43e3784898..53d5e6c2a0 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -220,6 +220,8 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec success = []reconciledContainer{} failure = []reconciledContainer{} + activeContainers := make(map[string]*v1.Pod) + for _, pod := range m.activePods() { allContainers := pod.Spec.InitContainers allContainers = append(allContainers, pod.Spec.Containers...) @@ -258,6 +260,8 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec } } + activeContainers[containerID] = pod + cset := m.state.GetCPUSetOrDefault(containerID) if cset.IsEmpty() { // NOTE: This should not happen outside of tests. @@ -276,6 +280,16 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec success = append(success, reconciledContainer{pod.Name, container.Name, containerID}) } } + + for containerID := range m.state.GetCPUAssignments() { + if pod, ok := activeContainers[containerID]; !ok { + err := m.RemoveContainer(containerID) + if err != nil { + klog.Errorf("[cpumanager] reconcileState: failed to remove container (pod: %s, container id: %s, error: %v)", pod.Name, containerID, err) + failure = append(failure, reconciledContainer{pod.Name, "", containerID}) + } + } + } return success, failure } diff --git a/test/e2e_node/BUILD b/test/e2e_node/BUILD index bca4f17619..0e934d0c42 100644 --- a/test/e2e_node/BUILD +++ b/test/e2e_node/BUILD @@ -119,6 +119,7 @@ go_test( "//pkg/kubelet/apis/stats/v1alpha1:go_default_library", "//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/cm/cpumanager:go_default_library", + "//pkg/kubelet/cm/cpumanager/state:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/eviction:go_default_library", diff --git a/test/e2e_node/cpu_manager_test.go b/test/e2e_node/cpu_manager_test.go index 852f610b7f..a4ed1aa5d0 100644 --- a/test/e2e_node/cpu_manager_test.go +++ b/test/e2e_node/cpu_manager_test.go @@ -29,6 +29,7 @@ import ( kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + cpumanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/test/e2e/framework" @@ -120,6 +121,18 @@ func waitForContainerRemoval(containerName, podName, podNS string) { }, 2*time.Minute, 1*time.Second).Should(BeTrue()) } +func waitForStateFileCleanedUp() { + Eventually(func() bool { + restoredState, err := cpumanagerstate.NewCheckpointState("/var/lib/kubelet", "cpu_manager_state", "static") + framework.ExpectNoError(err, "failed to create testing cpumanager state instance") + assignments := restoredState.GetCPUAssignments() + if len(assignments) == 0 { + return true + } + return false + }, 2*time.Minute, 1*time.Second).Should(BeTrue()) +} + func isHTEnabled() bool { outData, err := exec.Command("/bin/sh", "-c", "lscpu | grep \"Thread(s) per core:\" | cut -d \":\" -f 2").Output() framework.ExpectNoError(err) @@ -151,13 +164,37 @@ func setOldKubeletConfig(f *framework.Framework, oldCfg *kubeletconfig.KubeletCo } } -func enableCPUManagerInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.KubeletConfiguration) { +func disableCPUManagerInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.KubeletConfiguration) { + // Disable CPU Manager in Kubelet. + oldCfg, err := getCurrentKubeletConfig() + framework.ExpectNoError(err) + newCfg := oldCfg.DeepCopy() + if newCfg.FeatureGates == nil { + newCfg.FeatureGates = make(map[string]bool) + } + newCfg.FeatureGates["CPUManager"] = false + + // Update the Kubelet configuration. + framework.ExpectNoError(setKubeletConfiguration(f, newCfg)) + + // Wait for the Kubelet to be ready. + Eventually(func() bool { + nodeList := framework.GetReadySchedulableNodesOrDie(f.ClientSet) + return len(nodeList.Items) == 1 + }, time.Minute, time.Second).Should(BeTrue()) + + return oldCfg +} + +func enableCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool) (oldCfg *kubeletconfig.KubeletConfiguration) { // Enable CPU Manager in Kubelet with static policy. oldCfg, err := getCurrentKubeletConfig() framework.ExpectNoError(err) newCfg := oldCfg.DeepCopy() if newCfg.FeatureGates == nil { newCfg.FeatureGates = make(map[string]bool) + } else { + newCfg.FeatureGates["CPUManager"] = true } // After graduation of the CPU Manager feature to Beta, the CPU Manager @@ -168,7 +205,10 @@ func enableCPUManagerInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.Ku // be "none" whereas we are trying to restart Kubelet with "static" // policy). Therefore, we delete the state file so that we can proceed // with the tests. - deleteStateFile() + // Only delete the state file at the begin of the tests. + if cleanStateFile { + deleteStateFile() + } // Set the CPU Manager policy to static. newCfg.CPUManagerPolicy = string(cpumanager.PolicyStatic) @@ -218,7 +258,7 @@ func runCPUManagerTests(f *framework.Framework) { } // Enable CPU Manager in the kubelet. - oldCfg = enableCPUManagerInKubelet(f) + oldCfg = enableCPUManagerInKubelet(f, true) By("running a non-Gu pod") ctnAttrs = []ctnAttribute{ @@ -433,6 +473,49 @@ func runCPUManagerTests(f *framework.Framework) { waitForContainerRemoval(pod1.Spec.Containers[0].Name, pod1.Name, pod1.Namespace) waitForContainerRemoval(pod2.Spec.Containers[0].Name, pod2.Name, pod2.Namespace) + By("test for automatically remove inactive pods from cpumanager state file.") + // First running a Gu Pod, + // second disable cpu manager in kubelet, + // then delete the Gu Pod, + // then enable cpu manager in kubelet, + // at last wait for the reconcile process cleaned up the state file, if the assignments map is empty, + // it proves that the automatic cleanup in the reconcile process is in effect. + By("running a Gu pod for test remove") + ctnAttrs = []ctnAttribute{ + { + ctnName: "gu-container-testremove", + cpuRequest: "1000m", + cpuLimit: "1000m", + }, + } + pod = makeCPUManagerPod("gu-pod-testremove", ctnAttrs) + pod = f.PodClient().CreateSync(pod) + + By("checking if the expected cpuset was assigned") + cpu1 = 1 + if isHTEnabled() { + cpuList = cpuset.MustParse(getCPUSiblingList(0)).ToSlice() + cpu1 = cpuList[1] + } + expAllowedCPUsListRegex = fmt.Sprintf("^%d\n$", cpu1) + err = f.PodClient().MatchContainerOutput(pod.Name, pod.Spec.Containers[0].Name, expAllowedCPUsListRegex) + framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]", + pod.Spec.Containers[0].Name, pod.Name) + + By("disable cpu manager in kubelet") + disableCPUManagerInKubelet(f) + + By("by deleting the pod and waiting for container removal") + deletePods(f, []string{pod.Name}) + waitForContainerRemoval(pod.Spec.Containers[0].Name, pod.Name, pod.Namespace) + + By("enable cpu manager in kubelet without delete state file") + enableCPUManagerInKubelet(f, false) + + By("wait for the deleted pod to be cleaned up from the state file") + waitForStateFileCleanedUp() + By("the deleted pod has already been deleted from the state file") + setOldKubeletConfig(f, oldCfg) }) }