pod and qos level cgroup support

pull/6/head
derekwaynecarr 2016-10-17 13:23:48 -04:00 committed by Derek Carr
parent 0d228d6a61
commit 42289c2758
34 changed files with 1427 additions and 287 deletions

View File

@ -441,6 +441,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) {
ContainerRuntime: s.ContainerRuntime,
CgroupsPerQOS: s.CgroupsPerQOS,
CgroupRoot: s.CgroupRoot,
CgroupDriver: s.CgroupDriver,
ProtectKernelDefaults: s.ProtectKernelDefaults,
RuntimeIntegrationType: s.ExperimentalRuntimeIntegrationType,
})

View File

@ -210,8 +210,6 @@ make test_e2e_node TEST_ARGS="--disable-kubenet=false" # disable kubenet
For testing with the QoS Cgroup Hierarchy enabled, you can pass --cgroups-per-qos flag as an argument into Ginkgo using TEST_ARGS
*Note: Disabled pending feature stabilization.*
```sh
make test_e2e_node TEST_ARGS="--cgroups-per-qos=true"
```

View File

@ -29,6 +29,14 @@ NET_PLUGIN=${NET_PLUGIN:-""}
NET_PLUGIN_DIR=${NET_PLUGIN_DIR:-""}
KUBE_ROOT=$(dirname "${BASH_SOURCE}")/..
SERVICE_CLUSTER_IP_RANGE=${SERVICE_CLUSTER_IP_RANGE:-10.0.0.0/24}
# if enabled, must set CGROUP_ROOT
CGROUPS_PER_QOS=${CGROUPS_PER_QOS:-false}
# this is not defaulted to preserve backward compatibility.
# if CGROUPS_PER_QOS is enabled, recommend setting to /
CGROUP_ROOT=${CGROUP_ROOT:""}
# name of the cgroup driver, i.e. cgroupfs or systemd
CGROUP_DRIVER=${CGROUP_DRIVER:-""}
# We disable cluster DNS by default because this script uses docker0 (or whatever
# container bridge docker is currently using) and we don't know the IP of the
# DNS pod to pass in as --cluster-dns. To set this up by hand, set this flag
@ -464,6 +472,9 @@ function start_kubelet {
--feature-gates="${FEATURE_GATES}" \
--cpu-cfs-quota=${CPU_CFS_QUOTA} \
--enable-controller-attach-detach="${ENABLE_CONTROLLER_ATTACH_DETACH}" \
--cgroups-per-qos=${CGROUPS_PER_QOS} \
--cgroup-driver=${CGROUP_DRIVER} \
--cgroup-root=${CGROUP_ROOT} \
${dns_args} \
${net_plugin_dir_args} \
${net_plugin_args} \

View File

@ -368,13 +368,23 @@ func SetDefaults_KubeletConfiguration(obj *KubeletConfiguration) {
temp := int32(defaultIPTablesDropBit)
obj.IPTablesDropBit = &temp
}
if obj.CgroupDriver == "" {
obj.CgroupDriver = "cgroupfs"
}
if obj.CgroupsPerQOS == nil {
temp := false
obj.CgroupsPerQOS = &temp
}
if obj.CgroupDriver == "" {
obj.CgroupDriver = "cgroupfs"
}
// NOTE: this is for backwards compatibility with earlier releases where cgroup-root was optional.
// if cgroups per qos is not enabled, and cgroup-root is not specified, we need to default to the
// container runtime default and not default to the root cgroup.
if obj.CgroupsPerQOS != nil {
if *obj.CgroupsPerQOS {
if obj.CgroupRoot == "" {
obj.CgroupRoot = "/"
}
}
}
}
func boolVar(b bool) *bool {

View File

@ -28,6 +28,7 @@ go_library(
"//pkg/api/resource:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/qos:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util:go_default_library",
"//pkg/util/errors:go_default_library",
"//pkg/util/mount:go_default_library",
@ -41,16 +42,23 @@ go_library(
"//vendor:github.com/golang/glog",
"//vendor:github.com/opencontainers/runc/libcontainer/cgroups",
"//vendor:github.com/opencontainers/runc/libcontainer/cgroups/fs",
"//vendor:github.com/opencontainers/runc/libcontainer/cgroups/systemd",
"//vendor:github.com/opencontainers/runc/libcontainer/configs",
],
)
go_test(
name = "go_default_test",
srcs = ["container_manager_linux_test.go"],
srcs = [
"cgroup_manager_linux_test.go",
"container_manager_linux_test.go",
"helpers_linux_test.go",
],
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/resource:go_default_library",
"//pkg/util/mount:go_default_library",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:github.com/stretchr/testify/require",

View File

@ -18,13 +18,137 @@ package cm
import (
"fmt"
"os"
"path"
"path/filepath"
"strings"
"github.com/golang/glog"
libcontainercgroups "github.com/opencontainers/runc/libcontainer/cgroups"
cgroupfs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
cgroupsystemd "github.com/opencontainers/runc/libcontainer/cgroups/systemd"
libcontainerconfigs "github.com/opencontainers/runc/libcontainer/configs"
"k8s.io/kubernetes/pkg/util/sets"
)
// libcontainerCgroupManagerType defines how to interface with libcontainer
type libcontainerCgroupManagerType string
const (
// libcontainerCgroupfs means use libcontainer with cgroupfs
libcontainerCgroupfs libcontainerCgroupManagerType = "cgroupfs"
// libcontainerSystemd means use libcontainer with systemd
libcontainerSystemd libcontainerCgroupManagerType = "systemd"
)
// ConvertCgroupNameToSystemd converts the internal cgroup name to a systemd name.
// For example, the name /Burstable/pod_123-456 becomes Burstable-pod_123_456.slice
// If outputToCgroupFs is true, it expands the systemd name into the cgroupfs form.
// For example, it will return /Burstable.slice/Burstable-pod_123_456.slice in above scenario.
func ConvertCgroupNameToSystemd(cgroupName CgroupName, outputToCgroupFs bool) string {
name := string(cgroupName)
result := ""
if name != "" && name != "/" {
// systemd treats - as a step in the hierarchy, we convert all - to _
name = strings.Replace(name, "-", "_", -1)
parts := strings.Split(name, "/")
for _, part := range parts {
// ignore leading stuff for now
if part == "" {
continue
}
if len(result) > 0 {
result = result + "-"
}
result = result + part
}
} else {
// root converts to -
result = "-"
}
// always have a .slice suffix
result = result + ".slice"
// if the caller desired the result in cgroupfs format...
if outputToCgroupFs {
var err error
result, err = cgroupsystemd.ExpandSlice(result)
if err != nil {
panic(fmt.Errorf("error adapting cgroup name, input: %v, err: %v", name, err))
}
}
return result
}
// ConvertCgroupFsNameToSystemd converts an expanded cgroupfs name to its systemd name.
// For example, it will convert test.slice/test-a.slice/test-a-b.slice to become test-a-b.slice
// NOTE: this is public right now to allow its usage in dockermanager and dockershim, ideally both those
// code areas could use something from libcontainer if we get this style function upstream.
func ConvertCgroupFsNameToSystemd(cgroupfsName string) (string, error) {
// TODO: see if libcontainer systemd implementation could use something similar, and if so, move
// this function up to that library. At that time, it would most likely do validation specific to systemd
// above and beyond the simple assumption here that the base of the path encodes the hierarchy
// per systemd convention.
return path.Base(cgroupfsName), nil
}
// libcontainerAdapter provides a simplified interface to libcontainer based on libcontainer type.
type libcontainerAdapter struct {
// cgroupManagerType defines how to interface with libcontainer
cgroupManagerType libcontainerCgroupManagerType
}
// newLibcontainerAdapter returns a configured libcontainerAdapter for specified manager.
// it does any initialization required by that manager to function.
func newLibcontainerAdapter(cgroupManagerType libcontainerCgroupManagerType) *libcontainerAdapter {
return &libcontainerAdapter{cgroupManagerType: cgroupManagerType}
}
// newManager returns an implementation of cgroups.Manager
func (l *libcontainerAdapter) newManager(cgroups *libcontainerconfigs.Cgroup, paths map[string]string) (libcontainercgroups.Manager, error) {
switch l.cgroupManagerType {
case libcontainerCgroupfs:
return &cgroupfs.Manager{
Cgroups: cgroups,
Paths: paths,
}, nil
case libcontainerSystemd:
// this means you asked systemd to manage cgroups, but systemd was not on the host, so all you can do is panic...
if !cgroupsystemd.UseSystemd() {
panic("systemd cgroup manager not available")
}
return &cgroupsystemd.Manager{
Cgroups: cgroups,
Paths: paths,
}, nil
}
return nil, fmt.Errorf("invalid cgroup manager configuration")
}
func (l *libcontainerAdapter) revertName(name string) CgroupName {
if l.cgroupManagerType != libcontainerSystemd {
return CgroupName(name)
}
driverName, err := ConvertCgroupFsNameToSystemd(name)
if err != nil {
panic(err)
}
driverName = strings.TrimSuffix(driverName, ".slice")
driverName = strings.Replace(driverName, "_", "-", -1)
return CgroupName(driverName)
}
// adaptName converts a CgroupName identifer to a driver specific conversion value.
// if outputToCgroupFs is true, the result is returned in the cgroupfs format rather than the driver specific form.
func (l *libcontainerAdapter) adaptName(cgroupName CgroupName, outputToCgroupFs bool) string {
if l.cgroupManagerType != libcontainerSystemd {
name := string(cgroupName)
return name
}
return ConvertCgroupNameToSystemd(cgroupName, outputToCgroupFs)
}
// CgroupSubsystems holds information about the mounted cgroup subsytems
type CgroupSubsystems struct {
// Cgroup subsystem mounts.
@ -44,60 +168,93 @@ type cgroupManagerImpl struct {
// subsystems holds information about all the
// mounted cgroup subsytems on the node
subsystems *CgroupSubsystems
// simplifies interaction with libcontainer and its cgroup managers
adapter *libcontainerAdapter
}
// Make sure that cgroupManagerImpl implements the CgroupManager interface
var _ CgroupManager = &cgroupManagerImpl{}
// NewCgroupManager is a factory method that returns a CgroupManager
func NewCgroupManager(cs *CgroupSubsystems) CgroupManager {
func NewCgroupManager(cs *CgroupSubsystems, cgroupDriver string) CgroupManager {
managerType := libcontainerCgroupfs
if cgroupDriver == string(libcontainerSystemd) {
managerType = libcontainerSystemd
}
return &cgroupManagerImpl{
subsystems: cs,
adapter: newLibcontainerAdapter(managerType),
}
}
// Exists checks if all subsystem cgroups already exist
func (m *cgroupManagerImpl) Exists(name string) bool {
// Get map of all cgroup paths on the system for the particular cgroup
// Name converts the cgroup to the driver specific value in cgroupfs form.
func (m *cgroupManagerImpl) Name(name CgroupName) string {
return m.adapter.adaptName(name, true)
}
// CgroupName converts the literal cgroupfs name on the host to an internal identifier.
func (m *cgroupManagerImpl) CgroupName(name string) CgroupName {
return m.adapter.revertName(name)
}
// buildCgroupPaths builds a path to each cgroup subsystem for the specified name.
func (m *cgroupManagerImpl) buildCgroupPaths(name CgroupName) map[string]string {
cgroupFsAdaptedName := m.Name(name)
cgroupPaths := make(map[string]string, len(m.subsystems.MountPoints))
for key, val := range m.subsystems.MountPoints {
cgroupPaths[key] = path.Join(val, name)
cgroupPaths[key] = path.Join(val, cgroupFsAdaptedName)
}
return cgroupPaths
}
// If even one cgroup doesn't exist we go on to create it
// Exists checks if all subsystem cgroups already exist
func (m *cgroupManagerImpl) Exists(name CgroupName) bool {
// Get map of all cgroup paths on the system for the particular cgroup
cgroupPaths := m.buildCgroupPaths(name)
// If even one cgroup path doesn't exist, then the cgroup doesn't exist.
for _, path := range cgroupPaths {
if !libcontainercgroups.PathExists(path) {
return false
}
}
return true
}
// Destroy destroys the specified cgroup
func (m *cgroupManagerImpl) Destroy(cgroupConfig *CgroupConfig) error {
//cgroup name
name := cgroupConfig.Name
cgroupPaths := m.buildCgroupPaths(cgroupConfig.Name)
// Get map of all cgroup paths on the system for the particular cgroup
cgroupPaths := make(map[string]string, len(m.subsystems.MountPoints))
for key, val := range m.subsystems.MountPoints {
cgroupPaths[key] = path.Join(val, name)
// we take the location in traditional cgroupfs format.
abstractCgroupFsName := string(cgroupConfig.Name)
abstractParent := CgroupName(path.Dir(abstractCgroupFsName))
abstractName := CgroupName(path.Base(abstractCgroupFsName))
driverParent := m.adapter.adaptName(abstractParent, false)
driverName := m.adapter.adaptName(abstractName, false)
// this is an ugly abstraction bleed, but systemd cgroup driver requires full paths...
if m.adapter.cgroupManagerType == libcontainerSystemd {
driverName = m.adapter.adaptName(cgroupConfig.Name, false)
}
// Initialize libcontainer's cgroup config
// Initialize libcontainer's cgroup config with driver specific naming.
libcontainerCgroupConfig := &libcontainerconfigs.Cgroup{
Name: path.Base(name),
Parent: path.Dir(name),
Name: driverName,
Parent: driverParent,
}
fsCgroupManager := cgroupfs.Manager{
Cgroups: libcontainerCgroupConfig,
Paths: cgroupPaths,
manager, err := m.adapter.newManager(libcontainerCgroupConfig, cgroupPaths)
if err != nil {
return err
}
// Delete cgroups using libcontainers Managers Destroy() method
if err := fsCgroupManager.Destroy(); err != nil {
return fmt.Errorf("Unable to destroy cgroup paths for cgroup %v : %v", name, err)
if err = manager.Destroy(); err != nil {
return fmt.Errorf("Unable to destroy cgroup paths for cgroup %v : %v", cgroupConfig.Name, err)
}
return nil
}
@ -126,7 +283,7 @@ var supportedSubsystems = []subsystem{
func setSupportedSubsytems(cgroupConfig *libcontainerconfigs.Cgroup) error {
for _, sys := range supportedSubsystems {
if _, ok := cgroupConfig.Paths[sys.Name()]; !ok {
return fmt.Errorf("Failed to find subsytem mount for subsytem")
return fmt.Errorf("Failed to find subsytem mount for subsytem: %v", sys.Name())
}
if err := sys.Set(cgroupConfig.Paths[sys.Name()], cgroupConfig); err != nil {
return fmt.Errorf("Failed to set config for supported subsystems : %v", err)
@ -135,14 +292,11 @@ func setSupportedSubsytems(cgroupConfig *libcontainerconfigs.Cgroup) error {
return nil
}
// Update updates the cgroup with the specified Cgroup Configuration
func (m *cgroupManagerImpl) Update(cgroupConfig *CgroupConfig) error {
//cgroup name
name := cgroupConfig.Name
// Extract the cgroup resource parameters
resourceConfig := cgroupConfig.ResourceParameters
func (m *cgroupManagerImpl) toResources(resourceConfig *ResourceConfig) *libcontainerconfigs.Resources {
resources := &libcontainerconfigs.Resources{}
if resourceConfig == nil {
return resources
}
if resourceConfig.Memory != nil {
resources.Memory = *resourceConfig.Memory
}
@ -152,51 +306,149 @@ func (m *cgroupManagerImpl) Update(cgroupConfig *CgroupConfig) error {
if resourceConfig.CpuQuota != nil {
resources.CpuQuota = *resourceConfig.CpuQuota
}
if resourceConfig.CpuPeriod != nil {
resources.CpuPeriod = *resourceConfig.CpuPeriod
}
return resources
}
// Get map of all cgroup paths on the system for the particular cgroup
cgroupPaths := make(map[string]string, len(m.subsystems.MountPoints))
for key, val := range m.subsystems.MountPoints {
cgroupPaths[key] = path.Join(val, name)
// Update updates the cgroup with the specified Cgroup Configuration
func (m *cgroupManagerImpl) Update(cgroupConfig *CgroupConfig) error {
// Extract the cgroup resource parameters
resourceConfig := cgroupConfig.ResourceParameters
resources := m.toResources(resourceConfig)
cgroupPaths := m.buildCgroupPaths(cgroupConfig.Name)
// we take the location in traditional cgroupfs format.
abstractCgroupFsName := string(cgroupConfig.Name)
abstractParent := CgroupName(path.Dir(abstractCgroupFsName))
abstractName := CgroupName(path.Base(abstractCgroupFsName))
driverParent := m.adapter.adaptName(abstractParent, false)
driverName := m.adapter.adaptName(abstractName, false)
// this is an ugly abstraction bleed, but systemd cgroup driver requires full paths...
if m.adapter.cgroupManagerType == libcontainerSystemd {
driverName = m.adapter.adaptName(cgroupConfig.Name, false)
}
// Initialize libcontainer's cgroup config
libcontainerCgroupConfig := &libcontainerconfigs.Cgroup{
Name: path.Base(name),
Parent: path.Dir(name),
Name: driverName,
Parent: driverParent,
Resources: resources,
Paths: cgroupPaths,
}
if err := setSupportedSubsytems(libcontainerCgroupConfig); err != nil {
return fmt.Errorf("Failed to set supported cgroup subsystems for cgroup %v: %v", name, err)
return fmt.Errorf("failed to set supported cgroup subsystems for cgroup %v: %v", cgroupConfig.Name, err)
}
return nil
}
// Create creates the specified cgroup
func (m *cgroupManagerImpl) Create(cgroupConfig *CgroupConfig) error {
// get cgroup name
name := cgroupConfig.Name
// Initialize libcontainer's cgroup config
// we take the location in traditional cgroupfs format.
abstractCgroupFsName := string(cgroupConfig.Name)
abstractParent := CgroupName(path.Dir(abstractCgroupFsName))
abstractName := CgroupName(path.Base(abstractCgroupFsName))
driverParent := m.adapter.adaptName(abstractParent, false)
driverName := m.adapter.adaptName(abstractName, false)
// this is an ugly abstraction bleed, but systemd cgroup driver requires full paths...
if m.adapter.cgroupManagerType == libcontainerSystemd {
driverName = m.adapter.adaptName(cgroupConfig.Name, false)
}
resources := m.toResources(cgroupConfig.ResourceParameters)
// Initialize libcontainer's cgroup config with driver specific naming.
libcontainerCgroupConfig := &libcontainerconfigs.Cgroup{
Name: path.Base(name),
Parent: path.Dir(name),
Resources: &libcontainerconfigs.Resources{},
Name: driverName,
Parent: driverParent,
Resources: resources,
}
// get the fscgroup Manager with the specified cgroup configuration
fsCgroupManager := &cgroupfs.Manager{
Cgroups: libcontainerCgroupConfig,
// get the manager with the specified cgroup configuration
manager, err := m.adapter.newManager(libcontainerCgroupConfig, nil)
if err != nil {
return err
}
//Apply(0) is a hack to create the cgroup directories for each resource
// Apply(-1) is a hack to create the cgroup directories for each resource
// subsystem. The function [cgroups.Manager.apply()] applies cgroup
// configuration to the process with the specified pid.
// It creates cgroup files for each subsytems and writes the pid
// in the tasks file. We use the function to create all the required
// cgroup files but not attach any "real" pid to the cgroup.
if err := fsCgroupManager.Apply(-1); err != nil {
return fmt.Errorf("Failed to apply cgroup config for %v: %v", name, err)
if err := manager.Apply(-1); err != nil {
return err
}
// it may confuse why we call set after we do apply, but the issue is that runc
// follows a similar pattern. it's needed to ensure cpu quota is set properly.
m.Update(cgroupConfig)
return nil
}
// Scans through all subsytems to find pids associated with specified cgroup.
func (m *cgroupManagerImpl) Pids(name CgroupName) []int {
// we need the driver specific name
cgroupFsName := m.Name(name)
// Get a list of processes that we need to kill
pidsToKill := sets.NewInt()
var pids []int
for _, val := range m.subsystems.MountPoints {
dir := path.Join(val, cgroupFsName)
_, err := os.Stat(dir)
if os.IsNotExist(err) {
// The subsystem pod cgroup is already deleted
// do nothing, continue
continue
}
// Get a list of pids that are still charged to the pod's cgroup
pids, err = getCgroupProcs(dir)
if err != nil {
continue
}
pidsToKill.Insert(pids...)
// WalkFunc which is called for each file and directory in the pod cgroup dir
visitor := func(path string, info os.FileInfo, err error) error {
if !info.IsDir() {
return nil
}
pids, err = getCgroupProcs(path)
if err != nil {
glog.V(5).Infof("cgroup manager encountered error getting procs for cgroup path %v", path)
return filepath.SkipDir
}
pidsToKill.Insert(pids...)
return nil
}
// Walk through the pod cgroup directory to check if
// container cgroups haven't been GCed yet. Get attached processes to
// all such unwanted containers under the pod cgroup
if err = filepath.Walk(dir, visitor); err != nil {
glog.V(5).Infof("cgroup manager encountered error scanning pids for directory: %v", dir)
}
}
return pidsToKill.List()
}
// ReduceCPULimits reduces the cgroup's cpu shares to the lowest possible value
func (m *cgroupManagerImpl) ReduceCPULimits(cgroupName CgroupName) error {
// Set lowest possible CpuShares value for the cgroup
minimumCPUShares := int64(MinShares)
resources := &ResourceConfig{
CpuShares: &minimumCPUShares,
}
containerConfig := &CgroupConfig{
Name: cgroupName,
ResourceParameters: resources,
}
return m.Update(containerConfig)
}

View File

@ -0,0 +1,81 @@
// +build linux
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import "testing"
func TestLibcontainerAdapterAdaptToSystemd(t *testing.T) {
testCases := []struct {
input string
expected string
}{
{
input: "/",
expected: "-.slice",
},
{
input: "/Burstable",
expected: "Burstable.slice",
},
{
input: "/Burstable/pod_123",
expected: "Burstable-pod_123.slice",
},
{
input: "/BestEffort/pod_6c1a4e95-6bb6-11e6-bc26-28d2444e470d",
expected: "BestEffort-pod_6c1a4e95_6bb6_11e6_bc26_28d2444e470d.slice",
},
}
for _, testCase := range testCases {
f := newLibcontainerAdapter(libcontainerSystemd)
if actual := f.adaptName(CgroupName(testCase.input), false); actual != testCase.expected {
t.Errorf("Unexpected result, input: %v, expected: %v, actual: %v", testCase.input, testCase.expected, actual)
}
}
}
func TestLibcontainerAdapterAdaptToSystemdAsCgroupFs(t *testing.T) {
testCases := []struct {
input string
expected string
}{
{
input: "/",
expected: "/",
},
{
input: "/Burstable",
expected: "Burstable.slice/",
},
{
input: "/Burstable/pod_123",
expected: "Burstable.slice/Burstable-pod_123.slice/",
},
{
input: "/BestEffort/pod_6c1a4e95-6bb6-11e6-bc26-28d2444e470d",
expected: "BestEffort.slice/BestEffort-pod_6c1a4e95_6bb6_11e6_bc26_28d2444e470d.slice/",
},
}
for _, testCase := range testCases {
f := newLibcontainerAdapter(libcontainerSystemd)
if actual := f.adaptName(CgroupName(testCase.input), true); actual != testCase.expected {
t.Errorf("Unexpected result, input: %v, expected: %v, actual: %v", testCase.input, testCase.expected, actual)
}
}
}

View File

@ -34,7 +34,11 @@ func NewCgroupManager(_ interface{}) CgroupManager {
return &unsupportedCgroupManager{}
}
func (m *unsupportedCgroupManager) Exists(_ string) bool {
func (m *unsupportedCgroupManager) Name(_ CgroupName) string {
return ""
}
func (m *unsupportedCgroupManager) Exists(_ CgroupName) bool {
return false
}
@ -49,3 +53,7 @@ func (m *unsupportedCgroupManager) Update(_ *CgroupConfig) error {
func (m *unsupportedCgroupManager) Create(_ *CgroupConfig) error {
return fmt.Errorf("Cgroup Manager is not supported in this build")
}
func (m *unsupportedCgroupManager) Pids(_ CgroupName) []int {
return nil
}

View File

@ -55,6 +55,7 @@ type NodeConfig struct {
ContainerRuntime string
CgroupsPerQOS bool
CgroupRoot string
CgroupDriver string
ProtectKernelDefaults bool
RuntimeIntegrationType string
}

View File

@ -165,19 +165,27 @@ func validateSystemRequirements(mountUtil mount.Interface) (features, error) {
// Takes the absolute name of the specified containers.
// Empty container name disables use of the specified container.
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig) (ContainerManager, error) {
// Check if Cgroup-root actually exists on the node
if nodeConfig.CgroupsPerQOS {
if nodeConfig.CgroupRoot == "" {
return nil, fmt.Errorf("invalid configuration: cgroups-per-qos was specified and cgroup-root was not specified. To enable the QoS cgroup hierarchy you need to specify a valid cgroup-root")
}
if _, err := os.Stat(nodeConfig.CgroupRoot); err != nil {
return nil, fmt.Errorf("invalid configuration: cgroup-root doesn't exist : %v", err)
}
}
subsystems, err := GetCgroupSubsystems()
if err != nil {
return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
}
// Check if Cgroup-root actually exists on the node
if nodeConfig.CgroupsPerQOS {
// this does default to / when enabled, but this tests against regressions.
if nodeConfig.CgroupRoot == "" {
return nil, fmt.Errorf("invalid configuration: cgroups-per-qos was specified and cgroup-root was not specified. To enable the QoS cgroup hierarchy you need to specify a valid cgroup-root")
}
// we need to check that the cgroup root actually exists for each subsystem
// of note, we always use the cgroupfs driver when performing this check since
// the input is provided in that format.
// this is important because we do not want any name conversion to occur.
cgroupManager := NewCgroupManager(subsystems, "cgroupfs")
if !cgroupManager.Exists(CgroupName(nodeConfig.CgroupRoot)) {
return nil, fmt.Errorf("invalid configuration: cgroup-root doesn't exist: %v", err)
}
}
return &containerManagerImpl{
cadvisorInterface: cadvisorInterface,
mountUtil: mountUtil,
@ -195,11 +203,11 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
qosContainersInfo: cm.qosContainers,
nodeInfo: cm.nodeInfo,
subsystems: cm.subsystems,
cgroupManager: NewCgroupManager(cm.subsystems),
cgroupManager: NewCgroupManager(cm.subsystems, cm.NodeConfig.CgroupDriver),
}
}
return &podContainerManagerNoop{
cgroupRoot: cm.NodeConfig.CgroupRoot,
cgroupRoot: CgroupName(cm.NodeConfig.CgroupRoot),
}
}
@ -229,10 +237,8 @@ const (
// We create top level QoS containers for only Burstable and Best Effort
// and not Guaranteed QoS class. All guaranteed pods are nested under the
// RootContainer by default. InitQOS is called only once during kubelet bootstrapping.
// TODO(@dubstack) Add support for cgroup-root to work on both systemd and cgroupfs
// drivers. Currently we only support systems running cgroupfs driver
func InitQOS(rootContainer string, subsystems *CgroupSubsystems) (QOSContainersInfo, error) {
cm := NewCgroupManager(subsystems)
func InitQOS(cgroupDriver, rootContainer string, subsystems *CgroupSubsystems) (QOSContainersInfo, error) {
cm := NewCgroupManager(subsystems, cgroupDriver)
// Top level for Qos containers are created only for Burstable
// and Best Effort classes
qosClasses := [2]qos.QOSClass{qos.Burstable, qos.BestEffort}
@ -240,15 +246,17 @@ func InitQOS(rootContainer string, subsystems *CgroupSubsystems) (QOSContainersI
// Create containers for both qos classes
for _, qosClass := range qosClasses {
// get the container's absolute name
absoluteContainerName := path.Join(rootContainer, string(qosClass))
absoluteContainerName := CgroupName(path.Join(rootContainer, string(qosClass)))
// containerConfig object stores the cgroup specifications
containerConfig := &CgroupConfig{
Name: absoluteContainerName,
ResourceParameters: &ResourceConfig{},
}
// TODO(@dubstack) Add support on systemd cgroups driver
if err := cm.Create(containerConfig); err != nil {
return QOSContainersInfo{}, fmt.Errorf("failed to create top level %v QOS cgroup : %v", qosClass, err)
// check if it exists
if !cm.Exists(absoluteContainerName) {
if err := cm.Create(containerConfig); err != nil {
return QOSContainersInfo{}, fmt.Errorf("failed to create top level %v QOS cgroup : %v", qosClass, err)
}
}
}
// Store the top level qos container names
@ -317,7 +325,7 @@ func (cm *containerManagerImpl) setupNode() error {
// Setup top level qos containers only if CgroupsPerQOS flag is specified as true
if cm.NodeConfig.CgroupsPerQOS {
qosContainersInfo, err := InitQOS(cm.NodeConfig.CgroupRoot, cm.subsystems)
qosContainersInfo, err := InitQOS(cm.NodeConfig.CgroupDriver, cm.NodeConfig.CgroupRoot, cm.subsystems)
if err != nil {
return fmt.Errorf("failed to initialise top level QOS containers: %v", err)
}

View File

@ -17,11 +17,121 @@ limitations under the License.
package cm
import (
"bufio"
"fmt"
"os"
"path/filepath"
"strconv"
libcontainercgroups "github.com/opencontainers/runc/libcontainer/cgroups"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/qos"
)
const (
// Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
MinShares = 2
SharesPerCPU = 1024
MilliCPUToCPU = 1000
// 100000 is equivalent to 100ms
QuotaPeriod = 100000
MinQuotaPeriod = 1000
)
// MilliCPUToQuota converts milliCPU to CFS quota and period values.
func MilliCPUToQuota(milliCPU int64) (quota int64, period int64) {
// CFS quota is measured in two values:
// - cfs_period_us=100ms (the amount of time to measure usage across)
// - cfs_quota=20ms (the amount of cpu time allowed to be used across a period)
// so in the above example, you are limited to 20% of a single CPU
// for multi-cpu environments, you just scale equivalent amounts
if milliCPU == 0 {
return
}
// we set the period to 100ms by default
period = QuotaPeriod
// we then convert your milliCPU to a value normalized over a period
quota = (milliCPU * QuotaPeriod) / MilliCPUToCPU
// quota needs to be a minimum of 1ms.
if quota < MinQuotaPeriod {
quota = MinQuotaPeriod
}
return
}
// MilliCPUToShares converts the milliCPU to CFS shares.
func MilliCPUToShares(milliCPU int64) int64 {
if milliCPU == 0 {
// Docker converts zero milliCPU to unset, which maps to kernel default
// for unset: 1024. Return 2 here to really match kernel default for
// zero milliCPU.
return MinShares
}
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU
if shares < MinShares {
return MinShares
}
return shares
}
// ResourceConfigForPod takes the input pod and outputs the cgroup resource config.
func ResourceConfigForPod(pod *api.Pod) *ResourceConfig {
// sum requests and limits, track if limits were applied for each resource.
cpuRequests := int64(0)
cpuLimits := int64(0)
memoryLimits := int64(0)
memoryLimitsDeclared := true
cpuLimitsDeclared := true
for _, container := range pod.Spec.Containers {
cpuRequests += container.Resources.Requests.Cpu().MilliValue()
cpuLimits += container.Resources.Limits.Cpu().MilliValue()
if container.Resources.Limits.Cpu().IsZero() {
cpuLimitsDeclared = false
}
memoryLimits += container.Resources.Limits.Memory().Value()
if container.Resources.Limits.Memory().IsZero() {
memoryLimitsDeclared = false
}
}
// convert to CFS values
cpuShares := MilliCPUToShares(cpuRequests)
cpuQuota, cpuPeriod := MilliCPUToQuota(cpuLimits)
// determine the qos class
qosClass := qos.GetPodQOS(pod)
// build the result
result := &ResourceConfig{}
if qosClass == qos.Guaranteed {
result.CpuShares = &cpuShares
result.CpuQuota = &cpuQuota
result.CpuPeriod = &cpuPeriod
result.Memory = &memoryLimits
} else if qosClass == qos.Burstable {
result.CpuShares = &cpuShares
if cpuLimitsDeclared {
result.CpuQuota = &cpuQuota
result.CpuPeriod = &cpuPeriod
}
if memoryLimitsDeclared {
result.Memory = &memoryLimits
}
} else {
shares := int64(MinShares)
result.CpuShares = &shares
}
return result
}
// GetCgroupSubsystems returns information about the mounted cgroup subsystems
func GetCgroupSubsystems() (*CgroupSubsystems, error) {
// get all cgroup mounts.
@ -32,7 +142,6 @@ func GetCgroupSubsystems() (*CgroupSubsystems, error) {
if len(allCgroups) == 0 {
return &CgroupSubsystems{}, fmt.Errorf("failed to find cgroup mounts")
}
mountPoints := make(map[string]string, len(allCgroups))
for _, mount := range allCgroups {
for _, subsystem := range mount.Subsystems {
@ -44,3 +153,32 @@ func GetCgroupSubsystems() (*CgroupSubsystems, error) {
MountPoints: mountPoints,
}, nil
}
// getCgroupProcs takes a cgroup directory name as an argument
// reads through the cgroup's procs file and returns a list of tgid's.
// It returns an empty list if a procs file doesn't exists
func getCgroupProcs(dir string) ([]int, error) {
procsFile := filepath.Join(dir, "cgroup.procs")
f, err := os.Open(procsFile)
if err != nil {
if os.IsNotExist(err) {
// The procsFile does not exist, So no pids attached to this directory
return []int{}, nil
}
return nil, err
}
defer f.Close()
s := bufio.NewScanner(f)
out := []int{}
for s.Scan() {
if t := s.Text(); t != "" {
pid, err := strconv.Atoi(t)
if err != nil {
return nil, fmt.Errorf("unexpected line in %v; could not convert to pid: %v", procsFile, err)
}
out = append(out, pid)
}
}
return out, nil
}

View File

@ -0,0 +1,199 @@
// +build linux
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import (
"reflect"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
)
// getResourceList returns a ResourceList with the
// specified cpu and memory resource values
func getResourceList(cpu, memory string) api.ResourceList {
res := api.ResourceList{}
if cpu != "" {
res[api.ResourceCPU] = resource.MustParse(cpu)
}
if memory != "" {
res[api.ResourceMemory] = resource.MustParse(memory)
}
return res
}
// getResourceRequirements returns a ResourceRequirements object
func getResourceRequirements(requests, limits api.ResourceList) api.ResourceRequirements {
res := api.ResourceRequirements{}
res.Requests = requests
res.Limits = limits
return res
}
func TestResourceConfigForPod(t *testing.T) {
minShares := int64(MinShares)
burstableShares := MilliCPUToShares(100)
memoryQuantity := resource.MustParse("200Mi")
burstableMemory := memoryQuantity.Value()
burstablePartialShares := MilliCPUToShares(200)
burstableQuota, burstablePeriod := MilliCPUToQuota(200)
guaranteedShares := MilliCPUToShares(100)
guaranteedQuota, guaranteedPeriod := MilliCPUToQuota(100)
memoryQuantity = resource.MustParse("100Mi")
guaranteedMemory := memoryQuantity.Value()
testCases := map[string]struct {
pod *api.Pod
expected *ResourceConfig
}{
"besteffort": {
pod: &api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{
Resources: getResourceRequirements(getResourceList("", ""), getResourceList("", "")),
},
},
},
},
expected: &ResourceConfig{CpuShares: &minShares},
},
"burstable-no-limits": {
pod: &api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{
Resources: getResourceRequirements(getResourceList("100m", "100Mi"), getResourceList("", "")),
},
},
},
},
expected: &ResourceConfig{CpuShares: &burstableShares},
},
"burstable-with-limits": {
pod: &api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{
Resources: getResourceRequirements(getResourceList("100m", "100Mi"), getResourceList("200m", "200Mi")),
},
},
},
},
expected: &ResourceConfig{CpuShares: &burstableShares, CpuQuota: &burstableQuota, CpuPeriod: &burstablePeriod, Memory: &burstableMemory},
},
"burstable-partial-limits": {
pod: &api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{
Resources: getResourceRequirements(getResourceList("100m", "100Mi"), getResourceList("200m", "200Mi")),
},
{
Resources: getResourceRequirements(getResourceList("100m", "100Mi"), getResourceList("", "")),
},
},
},
},
expected: &ResourceConfig{CpuShares: &burstablePartialShares},
},
"guaranteed": {
pod: &api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{
Resources: getResourceRequirements(getResourceList("100m", "100Mi"), getResourceList("100m", "100Mi")),
},
},
},
},
expected: &ResourceConfig{CpuShares: &guaranteedShares, CpuQuota: &guaranteedQuota, CpuPeriod: &guaranteedPeriod, Memory: &guaranteedMemory},
},
}
for testName, testCase := range testCases {
actual := ResourceConfigForPod(testCase.pod)
if !reflect.DeepEqual(actual.CpuPeriod, testCase.expected.CpuPeriod) {
t.Errorf("unexpected result, test: %v, cpu period not as expected", testName)
}
if !reflect.DeepEqual(actual.CpuQuota, testCase.expected.CpuQuota) {
t.Errorf("unexpected result, test: %v, cpu quota not as expected", testName)
}
if !reflect.DeepEqual(actual.CpuShares, testCase.expected.CpuShares) {
t.Errorf("unexpected result, test: %v, cpu shares not as expected", testName)
}
if !reflect.DeepEqual(actual.Memory, testCase.expected.Memory) {
t.Errorf("unexpected result, test: %v, memory not as expected", testName)
}
}
}
func TestMilliCPUToQuota(t *testing.T) {
testCases := []struct {
input int64
quota int64
period int64
}{
{
input: int64(0),
quota: int64(0),
period: int64(0),
},
{
input: int64(5),
quota: int64(1000),
period: int64(100000),
},
{
input: int64(9),
quota: int64(1000),
period: int64(100000),
},
{
input: int64(10),
quota: int64(1000),
period: int64(100000),
},
{
input: int64(200),
quota: int64(20000),
period: int64(100000),
},
{
input: int64(500),
quota: int64(50000),
period: int64(100000),
},
{
input: int64(1000),
quota: int64(100000),
period: int64(100000),
},
{
input: int64(1500),
quota: int64(150000),
period: int64(100000),
},
}
for _, testCase := range testCases {
quota, period := MilliCPUToQuota(testCase.input)
if quota != testCase.quota || period != testCase.period {
t.Errorf("Input %v, expected quota %v period %v, but got quota %v period %v", testCase.input, testCase.quota, testCase.period, quota, period)
}
}
}

View File

@ -18,14 +18,20 @@ package cm
import (
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/types"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
)
const (
podCgroupNamePrefix = "pod#"
podCgroupNamePrefix = "pod"
)
// podContainerManagerImpl implements podContainerManager interface.
@ -56,7 +62,7 @@ func (m *podContainerManagerImpl) applyLimits(pod *api.Pod) error {
// Exists checks if the pod's cgroup already exists
func (m *podContainerManagerImpl) Exists(pod *api.Pod) bool {
podContainerName := m.GetPodContainerName(pod)
podContainerName, _ := m.GetPodContainerName(pod)
return m.cgroupManager.Exists(podContainerName)
}
@ -64,14 +70,14 @@ func (m *podContainerManagerImpl) Exists(pod *api.Pod) bool {
// pod cgroup exists if qos cgroup hierarchy flag is enabled.
// If the pod level container doesen't already exist it is created.
func (m *podContainerManagerImpl) EnsureExists(pod *api.Pod) error {
podContainerName := m.GetPodContainerName(pod)
podContainerName, _ := m.GetPodContainerName(pod)
// check if container already exist
alreadyExists := m.Exists(pod)
if !alreadyExists {
// Create the pod container
containerConfig := &CgroupConfig{
Name: podContainerName,
ResourceParameters: &ResourceConfig{},
ResourceParameters: ResourceConfigForPod(pod),
}
if err := m.cgroupManager.Create(containerConfig); err != nil {
return fmt.Errorf("failed to create container for %v : %v", podContainerName, err)
@ -87,11 +93,8 @@ func (m *podContainerManagerImpl) EnsureExists(pod *api.Pod) error {
return nil
}
// GetPodContainerName is a util func takes in a pod as an argument
// and returns the pod's cgroup name. We follow a pod cgroup naming format
// which is opaque and deterministic. Given a pod it's cgroup would be named
// "pod-UID" where the UID is the Pod UID
func (m *podContainerManagerImpl) GetPodContainerName(pod *api.Pod) string {
// GetPodContainerName returns the CgroupName identifer, and its literal cgroupfs form on the host.
func (m *podContainerManagerImpl) GetPodContainerName(pod *api.Pod) (CgroupName, string) {
podQOS := qos.GetPodQOS(pod)
// Get the parent QOS container name
var parentContainer string
@ -104,24 +107,127 @@ func (m *podContainerManagerImpl) GetPodContainerName(pod *api.Pod) string {
parentContainer = m.qosContainersInfo.BestEffort
}
podContainer := podCgroupNamePrefix + string(pod.UID)
// Get the absolute path of the cgroup
return path.Join(parentContainer, podContainer)
cgroupName := (CgroupName)(path.Join(parentContainer, podContainer))
// Get the literal cgroupfs name
cgroupfsName := m.cgroupManager.Name(cgroupName)
return cgroupName, cgroupfsName
}
// Scan through the whole cgroup directory and kill all processes either
// attached to the pod cgroup or to a container cgroup under the pod cgroup
func (m *podContainerManagerImpl) tryKillingCgroupProcesses(podCgroup CgroupName) error {
pidsToKill := m.cgroupManager.Pids(podCgroup)
// No pids charged to the terminated pod cgroup return
if len(pidsToKill) == 0 {
return nil
}
var errlist []error
// os.Kill often errors out,
// We try killing all the pids multiple times
for i := 0; i < 5; i++ {
if i != 0 {
glog.V(3).Infof("Attempt %v failed to kill all unwanted process. Retyring", i)
}
errlist = []error{}
for _, pid := range pidsToKill {
p, err := os.FindProcess(pid)
if err != nil {
// Process not running anymore, do nothing
continue
}
glog.V(3).Infof("Attempt to kill process with pid: %v", pid)
if err := p.Kill(); err != nil {
glog.V(3).Infof("failed to kill process with pid: %v", pid)
errlist = append(errlist, err)
}
}
if len(errlist) == 0 {
glog.V(3).Infof("successfully killed all unwanted processes.")
return nil
}
}
return utilerrors.NewAggregate(errlist)
}
// Destroy destroys the pod container cgroup paths
func (m *podContainerManagerImpl) Destroy(podCgroup string) error {
// This will house the logic for destroying the pod cgroups.
// Will be handled in the next PR.
func (m *podContainerManagerImpl) Destroy(podCgroup CgroupName) error {
// Try killing all the processes attached to the pod cgroup
if err := m.tryKillingCgroupProcesses(podCgroup); err != nil {
glog.V(3).Infof("failed to kill all the processes attached to the %v cgroups", podCgroup)
return fmt.Errorf("failed to kill all the processes attached to the %v cgroups : %v", podCgroup, err)
}
// Now its safe to remove the pod's cgroup
containerConfig := &CgroupConfig{
Name: podCgroup,
ResourceParameters: &ResourceConfig{},
}
if err := m.cgroupManager.Destroy(containerConfig); err != nil {
return fmt.Errorf("failed to delete cgroup paths for %v : %v", podCgroup, err)
}
return nil
}
// ReduceCPULimits reduces the CPU CFS values to the minimum amount of shares.
func (m *podContainerManagerImpl) ReduceCPULimits(podCgroup CgroupName) error {
return m.cgroupManager.ReduceCPULimits(podCgroup)
}
// GetAllPodsFromCgroups scans through all the subsytems of pod cgroups
// Get list of pods whose cgroup still exist on the cgroup mounts
func (m *podContainerManagerImpl) GetAllPodsFromCgroups() (map[types.UID]CgroupName, error) {
// Map for storing all the found pods on the disk
foundPods := make(map[types.UID]CgroupName)
qosContainersList := [3]string{m.qosContainersInfo.BestEffort, m.qosContainersInfo.Burstable, m.qosContainersInfo.Guaranteed}
// Scan through all the subsystem mounts
// and through each QoS cgroup directory for each subsystem mount
// If a pod cgroup exists in even a single subsystem mount
// we will attempt to delete it
for _, val := range m.subsystems.MountPoints {
for _, qosContainerName := range qosContainersList {
// get the subsystems QoS cgroup absolute name
qcConversion := m.cgroupManager.Name(CgroupName(qosContainerName))
qc := path.Join(val, qcConversion)
dirInfo, err := ioutil.ReadDir(qc)
if err != nil {
return nil, fmt.Errorf("failed to read the cgroup directory %v : %v", qc, err)
}
for i := range dirInfo {
// note: we do a contains check because on systemd, the literal cgroupfs name will prefix the qos as well.
if dirInfo[i].IsDir() && strings.Contains(dirInfo[i].Name(), podCgroupNamePrefix) {
// we need to convert the name to an internal identifier
internalName := m.cgroupManager.CgroupName(dirInfo[i].Name())
// we then split the name on the pod prefix to determine the uid
parts := strings.Split(string(internalName), podCgroupNamePrefix)
// the uid is missing, so we log the unexpected cgroup not of form pod<uid>
if len(parts) != 2 {
location := path.Join(qc, dirInfo[i].Name())
glog.Errorf("pod cgroup manager ignoring unexpected cgroup %v because it is not a pod", location)
continue
}
podUID := parts[1]
// because the literal cgroupfs name could encode the qos tier (on systemd), we avoid double encoding
// by just rebuilding the fully qualified CgroupName according to our internal convention.
cgroupName := CgroupName(path.Join(qosContainerName, podCgroupNamePrefix+podUID))
foundPods[types.UID(podUID)] = cgroupName
}
}
}
}
return foundPods, nil
}
// podContainerManagerNoop implements podContainerManager interface.
// It is a no-op implementation and basically does nothing
// podContainerManagerNoop is used in case the QoS cgroup Hierarchy is not
// enabled, so Exists() returns true always as the cgroupRoot
// is expected to always exist.
type podContainerManagerNoop struct {
cgroupRoot string
cgroupRoot CgroupName
}
// Make sure that podContainerManagerStub implements the PodContainerManager interface
@ -135,11 +241,23 @@ func (m *podContainerManagerNoop) EnsureExists(_ *api.Pod) error {
return nil
}
func (m *podContainerManagerNoop) GetPodContainerName(_ *api.Pod) string {
return m.cgroupRoot
func (m *podContainerManagerNoop) GetPodContainerName(_ *api.Pod) (CgroupName, string) {
return m.cgroupRoot, string(m.cgroupRoot)
}
func (m *podContainerManagerNoop) GetPodContainerNameForDriver(_ *api.Pod) string {
return ""
}
// Destroy destroys the pod container cgroup paths
func (m *podContainerManagerNoop) Destroy(_ string) error {
func (m *podContainerManagerNoop) Destroy(_ CgroupName) error {
return nil
}
func (m *podContainerManagerNoop) ReduceCPULimits(_ CgroupName) error {
return nil
}
func (m *podContainerManagerNoop) GetAllPodsFromCgroups() (map[types.UID]CgroupName, error) {
return nil, nil
}

View File

@ -16,7 +16,10 @@ limitations under the License.
package cm
import "k8s.io/kubernetes/pkg/api"
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
)
type podContainerManagerStub struct {
}
@ -31,10 +34,18 @@ func (m *podContainerManagerStub) EnsureExists(_ *api.Pod) error {
return nil
}
func (m *podContainerManagerStub) GetPodContainerName(_ *api.Pod) string {
return ""
func (m *podContainerManagerStub) GetPodContainerName(_ *api.Pod) (CgroupName, string) {
return "", ""
}
func (m *podContainerManagerStub) Destroy(_ string) error {
func (m *podContainerManagerStub) Destroy(_ CgroupName) error {
return nil
}
func (m *podContainerManagerStub) ReduceCPULimits(_ CgroupName) error {
return nil
}
func (m *podContainerManagerStub) GetAllPodsFromCgroups() (map[types.UID]CgroupName, error) {
return nil, nil
}

View File

@ -18,7 +18,10 @@ limitations under the License.
package cm
import "k8s.io/kubernetes/pkg/api"
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
)
type unsupportedPodContainerManager struct {
}
@ -33,10 +36,14 @@ func (m *unsupportedPodContainerManager) EnsureExists(_ *api.Pod) error {
return nil
}
func (m *unsupportedPodContainerManager) GetPodContainerName(_ *api.Pod) string {
return ""
func (m *unsupportedPodContainerManager) GetPodContainerName(_ *api.Pod) (CgroupName, string) {
return "", ""
}
func (m *unsupportedPodContainerManager) Destroy(_ string) error {
func (m *unsupportedPodContainerManager) ReduceCPULimits(_ CgroupName) error {
return nil
}
func (m *unsupportedPodContainerManager) GetAllPodsFromCgroups() (map[types.UID]CgroupName, error) {
return nil, nil
}

View File

@ -18,6 +18,7 @@ package cm
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
)
// ResourceConfig holds information about all the supported cgroup resource parameters.
@ -28,20 +29,20 @@ type ResourceConfig struct {
CpuShares *int64
// CPU hardcap limit (in usecs). Allowed cpu time in a given period.
CpuQuota *int64
// CPU quota period.
CpuPeriod *int64
}
// CgroupName is the abstract name of a cgroup prior to any driver specific conversion.
type CgroupName string
// CgroupConfig holds the cgroup configuration information.
// This is common object which is used to specify
// cgroup information to both systemd and raw cgroup fs
// implementation of the Cgroup Manager interface.
type CgroupConfig struct {
// We would expect systemd implementation to make appropriate
// name conversion. For example, if we pass /foo/bar
// then systemd should convert the name to something like
// foo.slice/foo-bar.slice
// Fully qualified name
Name string
// Fully qualified name prior to any driver specific conversions.
Name CgroupName
// ResourceParameters contains various cgroups settings to apply.
ResourceParameters *ResourceConfig
}
@ -53,12 +54,24 @@ type CgroupManager interface {
// It just creates the leaf cgroups.
// It expects the parent cgroup to already exist.
Create(*CgroupConfig) error
// Destroys the cgroup.
// Destroy the cgroup.
Destroy(*CgroupConfig) error
// Update cgroup configuration.
Update(*CgroupConfig) error
// Exists checks if the cgroup already exists
Exists(string) bool
Exists(name CgroupName) bool
// Name returns the literal cgroupfs name on the host after any driver specific conversions.
// We would expect systemd implementation to make appropriate name conversion.
// For example, if we pass /foo/bar
// then systemd should convert the name to something like
// foo.slice/foo-bar.slice
Name(name CgroupName) string
// CgroupName converts the literal cgroupfs name on the host to an internal identifier.
CgroupName(name string) CgroupName
// Pids scans through all subsytems to find pids associated with specified cgroup.
Pids(name CgroupName) []int
// ReduceCPULimits reduces the CPU CFS values to the minimum amount of shares.
ReduceCPULimits(cgroupName CgroupName) error
}
// QOSContainersInfo stores the names of containers per qos
@ -72,16 +85,23 @@ type QOSContainersInfo struct {
// The Pod workers interact with the PodContainerManager to create and destroy
// containers for the pod.
type PodContainerManager interface {
// getPodContainerName returns the pod container's absolute name
GetPodContainerName(*api.Pod) string
// GetPodContainerName returns the CgroupName identifer, and its literal cgroupfs form on the host.
GetPodContainerName(*api.Pod) (CgroupName, string)
// EnsureExists takes a pod as argument and makes sure that
// pod cgroup exists if qos cgroup hierarchy flag is enabled.
// If the pod cgroup doesen't already exist this method creates it.
EnsureExists(*api.Pod) error
// Exists returns true if the pod cgroup exists.
Exists(*api.Pod) bool
//Destroy takes a pod as argument and destroys the pod's container.
Destroy(string) error
// Destroy takes a pod Cgroup name as argument and destroys the pod's container.
Destroy(name CgroupName) error
// ReduceCPULimits reduces the CPU CFS values to the minimum amount of shares.
ReduceCPULimits(name CgroupName) error
// GetAllPodsFromCgroups enumerates the set of pod uids to their associated cgroup based on state of cgroupfs system.
GetAllPodsFromCgroups() (map[types.UID]CgroupName, error)
}

View File

@ -129,6 +129,7 @@ func (ds *dockerService) CreateContainer(podSandboxID string, config *runtimeApi
if lc := sandboxConfig.GetLinux(); lc != nil {
// Apply Cgroup options.
// TODO: Check if this works with per-pod cgroups.
// TODO: we need to pass the cgroup in syntax expected by cgroup driver but shim does not use docker info yet...
hc.CgroupParent = lc.GetCgroupParent()
// Apply namespace options.

View File

@ -31,6 +31,7 @@ go_library(
"//pkg/api/unversioned:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/credentialprovider:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/custommetrics:go_default_library",
"//pkg/kubelet/events:go_default_library",

View File

@ -48,17 +48,6 @@ const (
ext4MaxFileNameLen = 255
)
const (
// Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
minShares = 2
sharesPerCPU = 1024
milliCPUToCPU = 1000
// 100000 is equivalent to 100ms
quotaPeriod = 100000
minQuotaPeriod = 1000
)
// DockerInterface is an abstract interface for testability. It abstracts the interface of docker client.
type DockerInterface interface {
ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error)
@ -388,48 +377,6 @@ func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) D
return newKubeDockerClient(client, requestTimeout)
}
// milliCPUToQuota converts milliCPU to CFS quota and period values
func milliCPUToQuota(milliCPU int64) (quota int64, period int64) {
// CFS quota is measured in two values:
// - cfs_period_us=100ms (the amount of time to measure usage across)
// - cfs_quota=20ms (the amount of cpu time allowed to be used across a period)
// so in the above example, you are limited to 20% of a single CPU
// for multi-cpu environments, you just scale equivalent amounts
if milliCPU == 0 {
// take the default behavior from docker
return
}
// we set the period to 100ms by default
period = quotaPeriod
// we then convert your milliCPU to a value normalized over a period
quota = (milliCPU * quotaPeriod) / milliCPUToCPU
// quota needs to be a minimum of 1ms.
if quota < minQuotaPeriod {
quota = minQuotaPeriod
}
return
}
func milliCPUToShares(milliCPU int64) int64 {
if milliCPU == 0 {
// Docker converts zero milliCPU to unset, which maps to kernel default
// for unset: 1024. Return 2 here to really match kernel default for
// zero milliCPU.
return minShares
}
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
shares := (milliCPU * sharesPerCPU) / milliCPUToCPU
if shares < minShares {
return minShares
}
return shares
}
// GetKubeletDockerContainers lists all container or just the running ones.
// Returns a list of docker containers that we manage
func GetKubeletDockerContainers(client DockerInterface, allContainers bool) ([]*dockertypes.Container, error) {

View File

@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/images"
@ -138,6 +139,9 @@ type DockerManager struct {
// Root of the Docker runtime.
dockerRoot string
// cgroup driver used by Docker runtime.
cgroupDriver string
// Directory of container logs.
containerLogsDir string
@ -234,6 +238,14 @@ func NewDockerManager(
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
// if there are any problems.
dockerRoot := "/var/lib/docker"
// cgroup driver is only detectable in docker 1.12+
// when the execution driver is not detectable, we provide the cgroupfs form.
// if your docker engine is configured to use the systemd cgroup driver, and you
// want to use pod level cgroups, you must be on docker 1.12+ to ensure cgroup-parent
// is converted appropriately. otherwise, docker will fail to launch the container
// and complain the cgroup name provided did not conform to systemd conventions.
var cgroupDriver string
dockerInfo, err := client.Info()
if err != nil {
glog.Errorf("Failed to execute Info() call to the Docker client: %v", err)
@ -241,6 +253,9 @@ func NewDockerManager(
} else {
dockerRoot = dockerInfo.DockerRootDir
glog.Infof("Setting dockerRoot to %s", dockerRoot)
cgroupDriver = dockerInfo.CgroupDriver
glog.Infof("Setting cgroupDriver to %s", cgroupDriver)
}
dm := &DockerManager{
@ -252,6 +267,7 @@ func NewDockerManager(
podInfraContainerImage: podInfraContainerImage,
dockerPuller: newDockerPuller(client),
dockerRoot: dockerRoot,
cgroupDriver: cgroupDriver,
containerLogsDir: containerLogsDir,
networkPlugin: networkPlugin,
livenessManager: livenessManager,
@ -625,11 +641,11 @@ func (dm *DockerManager) runContainer(
// API server does this for new containers, but we repeat this logic in Kubelet
// for containers running on existing Kubernetes clusters.
if cpuRequest.IsZero() && !cpuLimit.IsZero() {
cpuShares = milliCPUToShares(cpuLimit.MilliValue())
cpuShares = cm.MilliCPUToShares(cpuLimit.MilliValue())
} else {
// if cpuRequest.Amount is nil, then milliCPUToShares will return the minimal number
// of CPU shares.
cpuShares = milliCPUToShares(cpuRequest.MilliValue())
cpuShares = cm.MilliCPUToShares(cpuRequest.MilliValue())
}
var devices []dockercontainer.DeviceMapping
if nvidiaGPULimit.Value() != 0 {
@ -715,14 +731,26 @@ func (dm *DockerManager) runContainer(
if dm.cpuCFSQuota {
// if cpuLimit.Amount is nil, then the appropriate default value is returned to allow full usage of cpu resource.
cpuQuota, cpuPeriod := milliCPUToQuota(cpuLimit.MilliValue())
cpuQuota, cpuPeriod := cm.MilliCPUToQuota(cpuLimit.MilliValue())
hc.CPUQuota = cpuQuota
hc.CPUPeriod = cpuPeriod
}
if len(opts.CgroupParent) > 0 {
hc.CgroupParent = opts.CgroupParent
cgroupParent := opts.CgroupParent
// if docker uses the systemd cgroup driver, it expects *.slice style names for cgroup parent.
// if we configured kubelet to use --cgroup-driver=cgroupfs, and docker is configured to use systemd driver
// docker will fail to launch the container because the name we provide will not be a valid slice.
// this is a very good thing.
if dm.cgroupDriver == "systemd" {
cgroupParent, err = cm.ConvertCgroupFsNameToSystemd(opts.CgroupParent)
if err != nil {
return kubecontainer.ContainerID{}, err
}
}
hc.CgroupParent = cgroupParent
glog.V(3).Infof("Container %v/%v/%v: setting cgroup parent: %v", pod.Namespace, pod.Name, container.Name, hc.CgroupParent)
}
dockerOpts := dockertypes.ContainerCreateConfig{

View File

@ -982,61 +982,6 @@ func TestMakePortsAndBindings(t *testing.T) {
}
}
func TestMilliCPUToQuota(t *testing.T) {
testCases := []struct {
input int64
quota int64
period int64
}{
{
input: int64(0),
quota: int64(0),
period: int64(0),
},
{
input: int64(5),
quota: int64(1000),
period: int64(100000),
},
{
input: int64(9),
quota: int64(1000),
period: int64(100000),
},
{
input: int64(10),
quota: int64(1000),
period: int64(100000),
},
{
input: int64(200),
quota: int64(20000),
period: int64(100000),
},
{
input: int64(500),
quota: int64(50000),
period: int64(100000),
},
{
input: int64(1000),
quota: int64(100000),
period: int64(100000),
},
{
input: int64(1500),
quota: int64(150000),
period: int64(100000),
},
}
for _, testCase := range testCases {
quota, period := milliCPUToQuota(testCase.input)
if quota != testCase.quota || period != testCase.period {
t.Errorf("Input %v, expected quota %v period %v, but got quota %v period %v", testCase.input, testCase.quota, testCase.period, quota, period)
}
}
}
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
func randStringBytes(n int) string {

View File

@ -1393,6 +1393,45 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
return errOuter
}
// Create Cgroups for the pod and apply resource parameters
// to them if cgroup-per-qos flag is enabled.
pcm := kl.containerManager.NewPodContainerManager()
// If pod has already been terminated then we need not create
// or update the pod's cgroup
if !kl.podIsTerminated(pod) {
// When the kubelet is restarted with the cgroup-per-qos
// flag enabled, all the pod's running containers
// should be killed intermittently and brought back up
// under the qos cgroup hierarchy.
// Check if this is the pod's first sync
firstSync := true
for _, containerStatus := range apiPodStatus.ContainerStatuses {
if containerStatus.State.Running != nil {
firstSync = false
break
}
}
// Don't kill containers in pod if pod's cgroups already
// exists or the pod is running for the first time
podKilled := false
if !pcm.Exists(pod) && !firstSync {
kl.killPod(pod, nil, podStatus, nil)
podKilled = true
}
// Create and Update pod's Cgroups
// Don't create cgroups for run once pod if it was killed above
// The current policy is not to restart the run once pods when
// the kubelet is restarted with the new flag as run once pods are
// expected to run only once and if the kubelet is restarted then
// they are not expected to run again.
// We don't create and apply updates to cgroup if its a run once pod and was killed above
if !(podKilled && pod.Spec.RestartPolicy == api.RestartPolicyNever) {
if err := pcm.EnsureExists(pod); err != nil {
return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
}
}
}
// Create Mirror Pod for Static Pod if it doesn't already exist
if kubepod.IsStaticPod(pod) {
podFullName := kubecontainer.GetPodFullName(pod)

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/fieldpath"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/images"
@ -242,7 +243,9 @@ func (kl *Kubelet) GeneratePodHostNameAndDomain(pod *api.Pod) (string, string, e
// the container runtime to set parameters for launching a container.
func (kl *Kubelet) GenerateRunContainerOptions(pod *api.Pod, container *api.Container, podIP string) (*kubecontainer.RunContainerOptions, error) {
var err error
opts := &kubecontainer.RunContainerOptions{CgroupParent: kl.cgroupRoot}
pcm := kl.containerManager.NewPodContainerManager()
_, podContainerName := pcm.GetPodContainerName(pod)
opts := &kubecontainer.RunContainerOptions{CgroupParent: podContainerName}
hostname, hostDomainName, err := kl.GeneratePodHostNameAndDomain(pod)
if err != nil {
return nil, err
@ -485,7 +488,35 @@ func (kl *Kubelet) killPod(pod *api.Pod, runningPod *kubecontainer.Pod, status *
} else if status != nil {
p = kubecontainer.ConvertPodStatusToRunningPod(kl.GetRuntime().Type(), status)
}
return kl.containerRuntime.KillPod(pod, p, gracePeriodOverride)
// cache the pod cgroup Name for reducing the cpu resource limits of the pod cgroup once the pod is killed
pcm := kl.containerManager.NewPodContainerManager()
var podCgroup cm.CgroupName
reduceCpuLimts := true
if pod != nil {
podCgroup, _ = pcm.GetPodContainerName(pod)
} else {
// If the pod is nil then cgroup limit must have already
// been decreased earlier
reduceCpuLimts = false
}
// Call the container runtime KillPod method which stops all running containers of the pod
if err := kl.containerRuntime.KillPod(pod, p, gracePeriodOverride); err != nil {
return err
}
// At this point the pod might not completely free up cpu and memory resources.
// In such a case deleting the pod's cgroup might cause the pod's charges to be transferred
// to the parent cgroup. There might be various kinds of pod charges at this point.
// For example, any volume used by the pod that was backed by memory will have its
// pages charged to the pod cgroup until those volumes are removed by the kubelet.
// Hence we only reduce the cpu resource limits of the pod's cgroup
// and defer the responsibilty of destroying the pod's cgroup to the
// cleanup method and the housekeeping loop.
if reduceCpuLimts {
pcm.ReduceCPULimits(podCgroup)
}
return nil
}
// makePodDataDirs creates the dirs for the pod datas.
@ -579,6 +610,22 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods []*api.
// NOTE: This function is executed by the main sync loop, so it
// should not contain any blocking calls.
func (kl *Kubelet) HandlePodCleanups() error {
// The kubelet lacks checkpointing, so we need to introspect the set of pods
// in the cgroup tree prior to inspecting the set of pods in our pod manager.
// this ensures our view of the cgroup tree does not mistakenly observe pods
// that are added after the fact...
var (
cgroupPods map[types.UID]cm.CgroupName
err error
)
if kl.cgroupsPerQOS {
pcm := kl.containerManager.NewPodContainerManager()
cgroupPods, err = pcm.GetAllPodsFromCgroups()
if err != nil {
return fmt.Errorf("failed to get list of pods that still exist on cgroup mounts: %v", err)
}
}
allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave regardless of the restart policy. The statuses
@ -644,6 +691,11 @@ func (kl *Kubelet) HandlePodCleanups() error {
glog.Errorf("Failed cleaning up bandwidth limits: %v", err)
}
// Remove any cgroups in the hierarchy for pods that should no longer exist
if kl.cgroupsPerQOS {
kl.cleanupOrphanedPodCgroups(cgroupPods, allPods, runningPods)
}
kl.backOff.GC()
return nil
}
@ -1204,3 +1256,40 @@ func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16
}
return kl.runner.PortForward(&pod, port, stream)
}
// cleanupOrphanedPodCgroups removes the Cgroups of pods that should not be
// running and whose volumes have been cleaned up.
func (kl *Kubelet) cleanupOrphanedPodCgroups(
cgroupPods map[types.UID]cm.CgroupName,
pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
// Add all running and existing terminated pods to a set allPods
allPods := sets.NewString()
for _, pod := range pods {
allPods.Insert(string(pod.UID))
}
for _, pod := range runningPods {
allPods.Insert(string(pod.ID))
}
pcm := kl.containerManager.NewPodContainerManager()
// Iterate over all the found pods to verify if they should be running
for uid, val := range cgroupPods {
if allPods.Has(string(uid)) {
continue
}
// If volumes have not been unmounted/detached, do not delete the cgroup in case so the charge does not go to the parent.
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist {
glog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up, Skipping cgroups deletion: %v", uid)
continue
}
glog.V(3).Infof("Orphaned pod %q found, removing pod cgroups", uid)
// Destroy all cgroups of pod that should not be running,
// by first killing all the attached processes to these cgroups.
// We ignore errors thrown by the method, as the housekeeping loop would
// again try to delete these unwanted pod cgroups
go pcm.Destroy(val)
}
return nil
}

View File

@ -106,6 +106,8 @@ type NodeTestContextType struct {
DisableKubenet bool
// Whether to enable the QoS Cgroup Hierarchy or not
CgroupsPerQOS bool
// How the kubelet should interface with the cgroup hierarchy (cgroupfs or systemd)
CgroupDriver string
// The hard eviction thresholds
EvictionHard string
// ManifestPath is the static pod manifest path.
@ -210,9 +212,9 @@ func RegisterNodeFlags() {
// TODO(random-liu): Remove kubelet related flags when we move the kubelet start logic out of the test.
// TODO(random-liu): Find someway to get kubelet configuration, and automatic config and filter test based on the configuration.
flag.BoolVar(&TestContext.DisableKubenet, "disable-kubenet", false, "If true, start kubelet without kubenet. (default false)")
// TODO: uncomment this when the flag is re-enabled in kubelet
//flag.BoolVar(&TestContext.CgroupsPerQOS, "cgroups-per-qos", false, "Enable creation of QoS cgroup hierarchy, if true top level QoS and pod cgroups are created.")
flag.StringVar(&TestContext.EvictionHard, "eviction-hard", "memory.available<250Mi,nodefs.available<10%,nodefs.inodesFree<5%", "The hard eviction thresholds. If set, pods get evicted when the specified resources drop below the thresholds.")
flag.BoolVar(&TestContext.CgroupsPerQOS, "cgroups-per-qos", false, "Enable creation of QoS cgroup hierarchy, if true top level QoS and pod cgroups are created.")
flag.StringVar(&TestContext.CgroupDriver, "cgroup-driver", "", "Driver that the kubelet uses to manipulate cgroups on the host. Possible values: 'cgroupfs', 'systemd'")
flag.StringVar(&TestContext.ManifestPath, "manifest-path", "", "The path to the static pod manifest file.")
flag.BoolVar(&TestContext.PrepullImages, "prepull-images", true, "If true, prepull images so image pull failures do not cause test failures.")
flag.StringVar(&TestContext.RuntimeIntegrationType, "runtime-integration-type", "", "Choose the integration path for the container runtime, mainly used for CRI validation.")

View File

@ -82,9 +82,11 @@ go_test(
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/kubelet/api/v1alpha1/stats:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/dockertools:go_default_library",
"//pkg/kubelet/images:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/qos:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/metrics:go_default_library",
"//pkg/runtime:go_default_library",

View File

@ -18,6 +18,9 @@ package e2e_node
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/test/e2e/framework"
@ -25,52 +28,265 @@ import (
. "github.com/onsi/gomega"
)
var _ = framework.KubeDescribe("Kubelet Cgroup Manager [Skip]", func() {
// getResourceList returns a ResourceList with the
// specified cpu and memory resource values
func getResourceList(cpu, memory string) api.ResourceList {
res := api.ResourceList{}
if cpu != "" {
res[api.ResourceCPU] = resource.MustParse(cpu)
}
if memory != "" {
res[api.ResourceMemory] = resource.MustParse(memory)
}
return res
}
// getResourceRequirements returns a ResourceRequirements object
func getResourceRequirements(requests, limits api.ResourceList) api.ResourceRequirements {
res := api.ResourceRequirements{}
res.Requests = requests
res.Limits = limits
return res
}
// makePodToVerifyCgroups returns a pod that verifies the existence of the specified cgroups.
func makePodToVerifyCgroups(cgroupNames []cm.CgroupName) *api.Pod {
// convert the names to their literal cgroupfs forms...
cgroupFsNames := []string{}
for _, cgroupName := range cgroupNames {
if framework.TestContext.CgroupDriver == "systemd" {
cgroupFsNames = append(cgroupFsNames, cm.ConvertCgroupNameToSystemd(cgroupName, true))
} else {
cgroupFsNames = append(cgroupFsNames, string(cgroupName))
}
}
// build the pod command to either verify cgroups exist
command := ""
for _, cgroupFsName := range cgroupFsNames {
localCommand := "if [ ! -d /tmp/memory/" + cgroupFsName + " ] || [ ! -d /tmp/cpu/" + cgroupFsName + " ]; then exit 1; fi; "
command += localCommand
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod" + string(uuid.NewUUID()),
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyNever,
Containers: []api.Container{
{
Image: "gcr.io/google_containers/busybox:1.24",
Name: "container" + string(uuid.NewUUID()),
Command: []string{"sh", "-c", command},
VolumeMounts: []api.VolumeMount{
{
Name: "sysfscgroup",
MountPath: "/tmp",
},
},
},
},
Volumes: []api.Volume{
{
Name: "sysfscgroup",
VolumeSource: api.VolumeSource{
HostPath: &api.HostPathVolumeSource{Path: "/sys/fs/cgroup"},
},
},
},
},
}
return pod
}
// makePodToVerifyCgroupRemoved verfies the specified cgroup does not exist.
func makePodToVerifyCgroupRemoved(cgroupName cm.CgroupName) *api.Pod {
cgroupFsName := string(cgroupName)
if framework.TestContext.CgroupDriver == "systemd" {
cgroupFsName = cm.ConvertCgroupNameToSystemd(cm.CgroupName(cgroupName), true)
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod" + string(uuid.NewUUID()),
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyOnFailure,
Containers: []api.Container{
{
Image: "gcr.io/google_containers/busybox:1.24",
Name: "container" + string(uuid.NewUUID()),
Command: []string{"sh", "-c", "for i in `seq 1 10`; do if [ ! -d /tmp/memory/" + cgroupFsName + " ] && [ ! -d /tmp/cpu/" + cgroupFsName + " ]; then exit 0; else sleep 10; fi; done; exit 1"},
VolumeMounts: []api.VolumeMount{
{
Name: "sysfscgroup",
MountPath: "/tmp",
},
},
},
},
Volumes: []api.Volume{
{
Name: "sysfscgroup",
VolumeSource: api.VolumeSource{
HostPath: &api.HostPathVolumeSource{Path: "/sys/fs/cgroup"},
},
},
},
},
}
return pod
}
var _ = framework.KubeDescribe("Kubelet Cgroup Manager", func() {
f := framework.NewDefaultFramework("kubelet-cgroup-manager")
Describe("QOS containers", func() {
Context("On enabling QOS cgroup hierarchy", func() {
It("Top level QoS containers should have been created", func() {
// return fast
if !framework.TestContext.CgroupsPerQOS {
return
}
podName := "qos-pod" + string(uuid.NewUUID())
contName := "qos-container" + string(uuid.NewUUID())
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
},
Spec: api.PodSpec{
// Don't restart the Pod since it is expected to exit
RestartPolicy: api.RestartPolicyNever,
Containers: []api.Container{
{
Image: "gcr.io/google_containers/busybox:1.24",
Name: contName,
Command: []string{"sh", "-c", "if [ -d /tmp/memory/Burstable ] && [ -d /tmp/memory/BestEffort ]; then exit 0; else exit 1; fi"},
VolumeMounts: []api.VolumeMount{
{
Name: "sysfscgroup",
MountPath: "/tmp",
},
},
},
},
Volumes: []api.Volume{
{
Name: "sysfscgroup",
VolumeSource: api.VolumeSource{
HostPath: &api.HostPathVolumeSource{Path: "/sys/fs/cgroup"},
},
},
},
},
}
podClient := f.PodClient()
podClient.Create(pod)
err := framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name)
cgroupsToVerify := []cm.CgroupName{cm.CgroupName(qos.Burstable), cm.CgroupName(qos.BestEffort)}
pod := makePodToVerifyCgroups(cgroupsToVerify)
f.PodClient().Create(pod)
err := framework.WaitForPodSuccessInNamespace(f.ClientSet, pod.Name, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
})
})
})
Describe("Pod containers", func() {
Context("On scheduling a Guaranteed Pod", func() {
It("Pod containers should have been created under the cgroup-root", func() {
if !framework.TestContext.CgroupsPerQOS {
return
}
var (
guaranteedPod *api.Pod
podUID string
)
By("Creating a Guaranteed pod in Namespace", func() {
guaranteedPod = f.PodClient().Create(&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod" + string(uuid.NewUUID()),
Namespace: f.Namespace.Name,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: framework.GetPauseImageName(f.ClientSet),
Name: "container" + string(uuid.NewUUID()),
Resources: getResourceRequirements(getResourceList("100m", "100Mi"), getResourceList("100m", "100Mi")),
},
},
},
})
podUID = string(guaranteedPod.UID)
})
By("Checking if the pod cgroup was created", func() {
cgroupsToVerify := []cm.CgroupName{cm.CgroupName("pod" + podUID)}
pod := makePodToVerifyCgroups(cgroupsToVerify)
f.PodClient().Create(pod)
err := framework.WaitForPodSuccessInNamespace(f.ClientSet, pod.Name, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
})
By("Checking if the pod cgroup was deleted", func() {
gp := int64(1)
Expect(f.PodClient().Delete(guaranteedPod.Name, &api.DeleteOptions{GracePeriodSeconds: &gp})).NotTo(HaveOccurred())
pod := makePodToVerifyCgroupRemoved(cm.CgroupName("pod" + podUID))
f.PodClient().Create(pod)
err := framework.WaitForPodSuccessInNamespace(f.ClientSet, pod.Name, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
})
})
})
Context("On scheduling a BestEffort Pod", func() {
It("Pod containers should have been created under the BestEffort cgroup", func() {
if !framework.TestContext.CgroupsPerQOS {
return
}
var (
podUID string
bestEffortPod *api.Pod
)
By("Creating a BestEffort pod in Namespace", func() {
bestEffortPod = f.PodClient().Create(&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod" + string(uuid.NewUUID()),
Namespace: f.Namespace.Name,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: framework.GetPauseImageName(f.ClientSet),
Name: "container" + string(uuid.NewUUID()),
Resources: getResourceRequirements(getResourceList("", ""), getResourceList("", "")),
},
},
},
})
podUID = string(bestEffortPod.UID)
})
By("Checking if the pod cgroup was created", func() {
cgroupsToVerify := []cm.CgroupName{cm.CgroupName("BestEffort/pod" + podUID)}
pod := makePodToVerifyCgroups(cgroupsToVerify)
f.PodClient().Create(pod)
err := framework.WaitForPodSuccessInNamespace(f.ClientSet, pod.Name, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
})
By("Checking if the pod cgroup was deleted", func() {
gp := int64(1)
Expect(f.PodClient().Delete(bestEffortPod.Name, &api.DeleteOptions{GracePeriodSeconds: &gp})).NotTo(HaveOccurred())
pod := makePodToVerifyCgroupRemoved(cm.CgroupName("BestEffort/pod" + podUID))
f.PodClient().Create(pod)
err := framework.WaitForPodSuccessInNamespace(f.ClientSet, pod.Name, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
})
})
})
Context("On scheduling a Burstable Pod", func() {
It("Pod containers should have been created under the Burstable cgroup", func() {
if !framework.TestContext.CgroupsPerQOS {
return
}
var (
podUID string
burstablePod *api.Pod
)
By("Creating a Burstable pod in Namespace", func() {
burstablePod = f.PodClient().Create(&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod" + string(uuid.NewUUID()),
Namespace: f.Namespace.Name,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: framework.GetPauseImageName(f.ClientSet),
Name: "container" + string(uuid.NewUUID()),
Resources: getResourceRequirements(getResourceList("100m", "100Mi"), getResourceList("200m", "200Mi")),
},
},
},
})
podUID = string(burstablePod.UID)
})
By("Checking if the pod cgroup was created", func() {
cgroupsToVerify := []cm.CgroupName{cm.CgroupName("Burstable/pod" + podUID)}
pod := makePodToVerifyCgroups(cgroupsToVerify)
f.PodClient().Create(pod)
err := framework.WaitForPodSuccessInNamespace(f.ClientSet, pod.Name, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
})
By("Checking if the pod cgroup was deleted", func() {
gp := int64(1)
Expect(f.PodClient().Delete(burstablePod.Name, &api.DeleteOptions{GracePeriodSeconds: &gp})).NotTo(HaveOccurred())
pod := makePodToVerifyCgroupRemoved(cm.CgroupName("Burstable/pod" + podUID))
f.PodClient().Create(pod)
err := framework.WaitForPodSuccessInNamespace(f.ClientSet, pod.Name, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
})
})
})
})
})

View File

@ -5,6 +5,5 @@ GCE_PROJECT=k8s-jkns-ci-node-e2e
CLEANUP=true
GINKGO_FLAGS='--skip="\[Flaky\]"'
SETUP_NODE=false
#TEST_ARGS=--cgroups-per-qos=false
TEST_ARGS='--feature-gates=DynamicKubeletConfig=true'
TEST_ARGS='--feature-gates=DynamicKubeletConfig=true --cgroups-per-qos=true'
PARALLELISM=1

View File

@ -5,6 +5,4 @@ GCE_PROJECT=k8s-jkns-ci-node-e2e
CLEANUP=true
GINKGO_FLAGS='--skip="\[Flaky\]|\[Serial\]"'
SETUP_NODE=false
# DISABLED --cgroups-per-qos flag until feature stabilized.
#TEST_ARGS=--cgroups-per-qos=false
TEST_ARGS=
TEST_ARGS=--cgroups-per-qos=true

View File

@ -5,6 +5,4 @@ GCE_PROJECT=k8s-jkns-ci-node-e2e
CLEANUP=true
GINKGO_FLAGS='--focus="\[Flaky\]"'
SETUP_NODE=false
# DISABLED --cgroups-per-qos flag until feature stabilized.
#TEST_ARGS=--cgroups-per-qos=false
TEST_ARGS=
TEST_ARGS=--cgroups-per-qos=true

View File

@ -5,6 +5,5 @@ GCE_PROJECT=k8s-jkns-pr-node-e2e
CLEANUP=true
GINKGO_FLAGS='--skip="\[Flaky\]|\[Slow\]|\[Serial\]" --flakeAttempts=2'
SETUP_NODE=false
# DISABLED --cgroups-per-qos flag until feature stabilized.
#TEST_ARGS=--cgroups-per-qos=false
TEST_ARGS=
TEST_ARGS=--cgroups-per-qos=true

View File

@ -5,8 +5,6 @@ GCE_PROJECT=k8s-jkns-ci-node-e2e
CLEANUP=true
GINKGO_FLAGS='--focus="\[Serial\]" --skip="\[Flaky\]|\[Benchmark\]"'
SETUP_NODE=false
# DISABLED --cgroups-per-qos flag until feature stabilized.
#TEST_ARGS=--cgroups-per-qos=false
TEST_ARGS='--feature-gates=DynamicKubeletConfig=true'
TEST_ARGS='--feature-gates=DynamicKubeletConfig=true --cgroups-per-qos=true'
PARALLELISM=1
TIMEOUT=3h

View File

@ -18,6 +18,4 @@ CLEANUP=true
# If true, current user will be added to the docker group on test node
SETUP_NODE=false
# If true QoS Cgroup Hierarchy is created and tests specifc to the cgroup hierarchy run
# DISABLED --cgroups-per-qos flag until feature stabilized.
#TEST_ARGS=--cgroups-per-qos=false
TEST_ARGS=
TEST_ARGS=--cgroups-per-qos=true

View File

@ -223,11 +223,17 @@ func (e *E2EServices) startKubelet() (*server, error) {
cmdArgs = append(cmdArgs, "--container-runtime-endpoint", framework.TestContext.ContainerRuntimeEndpoint)
}
if framework.TestContext.CgroupsPerQOS {
// TODO: enable this when the flag is stable and available in kubelet.
// cmdArgs = append(cmdArgs,
// "--cgroups-per-qos", "true",
// )
cmdArgs = append(cmdArgs,
"--cgroups-per-qos", "true",
"--cgroup-root", "/",
)
}
if framework.TestContext.CgroupDriver != "" {
cmdArgs = append(cmdArgs,
"--cgroup-driver", framework.TestContext.CgroupDriver,
)
}
if !framework.TestContext.DisableKubenet {
cwd, err := os.Getwd()
if err != nil {

View File

@ -215,6 +215,9 @@ Kubectl client Simple pod should support port-forward,ncdc,0
Kubectl client Update Demo should create and stop a replication controller,sttts,0
Kubectl client Update Demo should do a rolling update of a replication controller,sttts,0
Kubectl client Update Demo should scale a replication controller,sttts,0
Kubelet Cgroup Manager Pod containers On scheduling a BestEffort Pod Pod containers should have been created under the BestEffort cgroup,derekwaynecarr,0
Kubelet Cgroup Manager Pod containers On scheduling a Burstable Pod Pod containers should have been created under the Burstable cgroup,derekwaynecarr,0
Kubelet Cgroup Manager Pod containers On scheduling a Guaranteed Pod Pod containers should have been created under the cgroup-root,derekwaynecarr,0
Kubelet Cgroup Manager QOS containers On enabling QOS cgroup hierarchy Top level QoS containers should have been created,davidopp,1
Kubelet Container Manager Validate OOM score adjustments once the node is setup Kubelet's oom-score-adj should be -999,kargakis,1
"Kubelet Container Manager Validate OOM score adjustments once the node is setup burstable container's oom-score-adj should be between [2, 1000)",derekwaynecarr,1

1 name owner auto-assigned
215 Kubectl client Update Demo should create and stop a replication controller sttts 0
216 Kubectl client Update Demo should do a rolling update of a replication controller sttts 0
217 Kubectl client Update Demo should scale a replication controller sttts 0
218 Kubelet Cgroup Manager Pod containers On scheduling a BestEffort Pod Pod containers should have been created under the BestEffort cgroup derekwaynecarr 0
219 Kubelet Cgroup Manager Pod containers On scheduling a Burstable Pod Pod containers should have been created under the Burstable cgroup derekwaynecarr 0
220 Kubelet Cgroup Manager Pod containers On scheduling a Guaranteed Pod Pod containers should have been created under the cgroup-root derekwaynecarr 0
221 Kubelet Cgroup Manager QOS containers On enabling QOS cgroup hierarchy Top level QoS containers should have been created davidopp 1
222 Kubelet Container Manager Validate OOM score adjustments once the node is setup Kubelet's oom-score-adj should be -999 kargakis 1
223 Kubelet Container Manager Validate OOM score adjustments once the node is setup burstable container's oom-score-adj should be between [2, 1000) derekwaynecarr 1