clean containers in reconcileState of cpuManager

pull/564/head
ailusazh 2018-09-13 18:19:59 +08:00
parent 148248353b
commit 10995f661d
3 changed files with 101 additions and 3 deletions

View File

@ -220,6 +220,8 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
success = []reconciledContainer{}
failure = []reconciledContainer{}
activeContainers := make(map[string]*v1.Pod)
for _, pod := range m.activePods() {
allContainers := pod.Spec.InitContainers
allContainers = append(allContainers, pod.Spec.Containers...)
@ -258,6 +260,8 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
}
}
activeContainers[containerID] = pod
cset := m.state.GetCPUSetOrDefault(containerID)
if cset.IsEmpty() {
// NOTE: This should not happen outside of tests.
@ -276,6 +280,16 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
success = append(success, reconciledContainer{pod.Name, container.Name, containerID})
}
}
for containerID := range m.state.GetCPUAssignments() {
if pod, ok := activeContainers[containerID]; !ok {
err := m.RemoveContainer(containerID)
if err != nil {
klog.Errorf("[cpumanager] reconcileState: failed to remove container (pod: %s, container id: %s, error: %v)", pod.Name, containerID, err)
failure = append(failure, reconciledContainer{pod.Name, "", containerID})
}
}
}
return success, failure
}

View File

@ -119,6 +119,7 @@ go_test(
"//pkg/kubelet/apis/stats/v1alpha1:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/cm/cpumanager:go_default_library",
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/eviction:go_default_library",

View File

@ -29,6 +29,7 @@ import (
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
cpumanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/test/e2e/framework"
@ -120,6 +121,18 @@ func waitForContainerRemoval(containerName, podName, podNS string) {
}, 2*time.Minute, 1*time.Second).Should(BeTrue())
}
func waitForStateFileCleanedUp() {
Eventually(func() bool {
restoredState, err := cpumanagerstate.NewCheckpointState("/var/lib/kubelet", "cpu_manager_state", "static")
framework.ExpectNoError(err, "failed to create testing cpumanager state instance")
assignments := restoredState.GetCPUAssignments()
if len(assignments) == 0 {
return true
}
return false
}, 2*time.Minute, 1*time.Second).Should(BeTrue())
}
func isHTEnabled() bool {
outData, err := exec.Command("/bin/sh", "-c", "lscpu | grep \"Thread(s) per core:\" | cut -d \":\" -f 2").Output()
framework.ExpectNoError(err)
@ -151,13 +164,37 @@ func setOldKubeletConfig(f *framework.Framework, oldCfg *kubeletconfig.KubeletCo
}
}
func enableCPUManagerInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.KubeletConfiguration) {
func disableCPUManagerInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.KubeletConfiguration) {
// Disable CPU Manager in Kubelet.
oldCfg, err := getCurrentKubeletConfig()
framework.ExpectNoError(err)
newCfg := oldCfg.DeepCopy()
if newCfg.FeatureGates == nil {
newCfg.FeatureGates = make(map[string]bool)
}
newCfg.FeatureGates["CPUManager"] = false
// Update the Kubelet configuration.
framework.ExpectNoError(setKubeletConfiguration(f, newCfg))
// Wait for the Kubelet to be ready.
Eventually(func() bool {
nodeList := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
return len(nodeList.Items) == 1
}, time.Minute, time.Second).Should(BeTrue())
return oldCfg
}
func enableCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool) (oldCfg *kubeletconfig.KubeletConfiguration) {
// Enable CPU Manager in Kubelet with static policy.
oldCfg, err := getCurrentKubeletConfig()
framework.ExpectNoError(err)
newCfg := oldCfg.DeepCopy()
if newCfg.FeatureGates == nil {
newCfg.FeatureGates = make(map[string]bool)
} else {
newCfg.FeatureGates["CPUManager"] = true
}
// After graduation of the CPU Manager feature to Beta, the CPU Manager
@ -168,7 +205,10 @@ func enableCPUManagerInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.Ku
// be "none" whereas we are trying to restart Kubelet with "static"
// policy). Therefore, we delete the state file so that we can proceed
// with the tests.
deleteStateFile()
// Only delete the state file at the begin of the tests.
if cleanStateFile {
deleteStateFile()
}
// Set the CPU Manager policy to static.
newCfg.CPUManagerPolicy = string(cpumanager.PolicyStatic)
@ -218,7 +258,7 @@ func runCPUManagerTests(f *framework.Framework) {
}
// Enable CPU Manager in the kubelet.
oldCfg = enableCPUManagerInKubelet(f)
oldCfg = enableCPUManagerInKubelet(f, true)
By("running a non-Gu pod")
ctnAttrs = []ctnAttribute{
@ -433,6 +473,49 @@ func runCPUManagerTests(f *framework.Framework) {
waitForContainerRemoval(pod1.Spec.Containers[0].Name, pod1.Name, pod1.Namespace)
waitForContainerRemoval(pod2.Spec.Containers[0].Name, pod2.Name, pod2.Namespace)
By("test for automatically remove inactive pods from cpumanager state file.")
// First running a Gu Pod,
// second disable cpu manager in kubelet,
// then delete the Gu Pod,
// then enable cpu manager in kubelet,
// at last wait for the reconcile process cleaned up the state file, if the assignments map is empty,
// it proves that the automatic cleanup in the reconcile process is in effect.
By("running a Gu pod for test remove")
ctnAttrs = []ctnAttribute{
{
ctnName: "gu-container-testremove",
cpuRequest: "1000m",
cpuLimit: "1000m",
},
}
pod = makeCPUManagerPod("gu-pod-testremove", ctnAttrs)
pod = f.PodClient().CreateSync(pod)
By("checking if the expected cpuset was assigned")
cpu1 = 1
if isHTEnabled() {
cpuList = cpuset.MustParse(getCPUSiblingList(0)).ToSlice()
cpu1 = cpuList[1]
}
expAllowedCPUsListRegex = fmt.Sprintf("^%d\n$", cpu1)
err = f.PodClient().MatchContainerOutput(pod.Name, pod.Spec.Containers[0].Name, expAllowedCPUsListRegex)
framework.ExpectNoError(err, "expected log not found in container [%s] of pod [%s]",
pod.Spec.Containers[0].Name, pod.Name)
By("disable cpu manager in kubelet")
disableCPUManagerInKubelet(f)
By("by deleting the pod and waiting for container removal")
deletePods(f, []string{pod.Name})
waitForContainerRemoval(pod.Spec.Containers[0].Name, pod.Name, pod.Namespace)
By("enable cpu manager in kubelet without delete state file")
enableCPUManagerInKubelet(f, false)
By("wait for the deleted pod to be cleaned up from the state file")
waitForStateFileCleanedUp()
By("the deleted pod has already been deleted from the state file")
setOldKubeletConfig(f, oldCfg)
})
}