Merge pull request #14395 from vishh/kubelet-init

Require cpu and memory cgroups to be mounted
pull/6/head
Brian Grant 2015-09-24 22:43:25 -07:00
commit 7e42781d40
6 changed files with 244 additions and 62 deletions

View File

@ -35,7 +35,9 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/sets"
)
const (
@ -70,75 +72,62 @@ func newSystemContainer(containerName string) *systemContainer {
}
}
type nodeConfig struct {
dockerDaemonContainerName string
systemContainerName string
kubeletContainerName string
}
type containerManagerImpl struct {
cadvisorInterface cadvisor.Interface
mountUtil mount.Interface
nodeConfig
// External containers being managed.
systemContainers []*systemContainer
}
var _ containerManager = &containerManagerImpl{}
// checks if the required cgroups subsystems are mounted.
// As of now, only 'cpu' and 'memory' are required.
func validateSystemRequirements(mountUtil mount.Interface) error {
const (
cgroupMountType = "cgroup"
localErr = "system validation failed"
)
mountPoints, err := mountUtil.List()
if err != nil {
return fmt.Errorf("%s - %v", localErr, err)
}
expectedCgroups := sets.NewString("cpu", "cpuacct", "cpuset", "memory")
for _, mountPoint := range mountPoints {
if mountPoint.Type == cgroupMountType {
for _, opt := range mountPoint.Opts {
if expectedCgroups.Has(opt) {
expectedCgroups.Delete(opt)
}
}
}
}
if expectedCgroups.Len() > 0 {
return fmt.Errorf("%s - Following Cgroup subsystem not mounted: %v", localErr, expectedCgroups.List())
}
return nil
}
// TODO(vmarmol): Add limits to the system containers.
// Takes the absolute name of the specified containers.
// Empty container name disables use of the specified container.
func newContainerManager(cadvisorInterface cadvisor.Interface, dockerDaemonContainerName, systemContainerName, kubeletContainerName string) (containerManager, error) {
systemContainers := []*systemContainer{}
if dockerDaemonContainerName != "" {
cont := newSystemContainer(dockerDaemonContainerName)
info, err := cadvisorInterface.MachineInfo()
var capacity = api.ResourceList{}
if err != nil {
} else {
capacity = CapacityFromMachineInfo(info)
}
memoryLimit := (int64(capacity.Memory().Value() * DockerMemoryLimitThresholdPercent / 100))
if memoryLimit < MinDockerMemoryLimit {
glog.Warningf("Memory limit %d for container %s is too small, reset it to %d", memoryLimit, dockerDaemonContainerName, MinDockerMemoryLimit)
memoryLimit = MinDockerMemoryLimit
}
glog.V(2).Infof("Configure resource-only container %s with memory limit: %d", dockerDaemonContainerName, memoryLimit)
dockerContainer := &fs.Manager{
Cgroups: &configs.Cgroup{
Name: dockerDaemonContainerName,
Memory: memoryLimit,
MemorySwap: -1,
AllowAllDevices: true,
},
}
cont.ensureStateFunc = func(manager *fs.Manager) error {
return ensureDockerInContainer(cadvisorInterface, -900, dockerContainer)
}
systemContainers = append(systemContainers, cont)
}
if systemContainerName != "" {
if systemContainerName == "/" {
return nil, fmt.Errorf("system container cannot be root (\"/\")")
}
rootContainer := &fs.Manager{
Cgroups: &configs.Cgroup{
Name: "/",
},
}
manager := createManager(systemContainerName)
err := ensureSystemContainer(rootContainer, manager)
if err != nil {
return nil, err
}
systemContainers = append(systemContainers, newSystemContainer(systemContainerName))
}
if kubeletContainerName != "" {
systemContainers = append(systemContainers, newSystemContainer(kubeletContainerName))
}
func newContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, dockerDaemonContainerName, systemContainerName, kubeletContainerName string) (containerManager, error) {
return &containerManagerImpl{
systemContainers: systemContainers,
cadvisorInterface: cadvisorInterface,
mountUtil: mountUtil,
nodeConfig: nodeConfig{
dockerDaemonContainerName: dockerDaemonContainerName,
systemContainerName: systemContainerName,
kubeletContainerName: kubeletContainerName,
},
}, nil
}
@ -152,7 +141,73 @@ func createManager(containerName string) *fs.Manager {
}
}
func (cm *containerManagerImpl) setupNode() error {
if err := validateSystemRequirements(cm.mountUtil); err != nil {
return err
}
systemContainers := []*systemContainer{}
if cm.dockerDaemonContainerName != "" {
cont := newSystemContainer(cm.dockerDaemonContainerName)
info, err := cm.cadvisorInterface.MachineInfo()
var capacity = api.ResourceList{}
if err != nil {
} else {
capacity = CapacityFromMachineInfo(info)
}
memoryLimit := (int64(capacity.Memory().Value() * DockerMemoryLimitThresholdPercent / 100))
if memoryLimit < MinDockerMemoryLimit {
glog.Warningf("Memory limit %d for container %s is too small, reset it to %d", memoryLimit, cm.dockerDaemonContainerName, MinDockerMemoryLimit)
memoryLimit = MinDockerMemoryLimit
}
glog.V(2).Infof("Configure resource-only container %s with memory limit: %d", cm.dockerDaemonContainerName, memoryLimit)
dockerContainer := &fs.Manager{
Cgroups: &configs.Cgroup{
Name: cm.dockerDaemonContainerName,
Memory: memoryLimit,
MemorySwap: -1,
AllowAllDevices: true,
},
}
cont.ensureStateFunc = func(manager *fs.Manager) error {
return ensureDockerInContainer(cm.cadvisorInterface, -900, dockerContainer)
}
systemContainers = append(systemContainers, cont)
}
if cm.systemContainerName != "" {
if cm.systemContainerName == "/" {
return fmt.Errorf("system container cannot be root (\"/\")")
}
rootContainer := &fs.Manager{
Cgroups: &configs.Cgroup{
Name: "/",
},
}
manager := createManager(cm.systemContainerName)
err := ensureSystemContainer(rootContainer, manager)
if err != nil {
return err
}
systemContainers = append(systemContainers, newSystemContainer(cm.systemContainerName))
}
if cm.kubeletContainerName != "" {
systemContainers = append(systemContainers, newSystemContainer(cm.kubeletContainerName))
}
cm.systemContainers = systemContainers
return nil
}
func (cm *containerManagerImpl) Start() error {
// Setup the node
if err := cm.setupNode(); err != nil {
return err
}
// Don't run a background thread if there are no ensureStateFuncs.
numEnsureStateFuncs := 0
for _, cont := range cm.systemContainers {

View File

@ -0,0 +1,125 @@
// +build linux
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/util/mount"
)
type fakeMountInterface struct {
mountPoints []mount.MountPoint
}
func (mi *fakeMountInterface) Mount(source string, target string, fstype string, options []string) error {
return fmt.Errorf("unsupported")
}
func (mi *fakeMountInterface) Unmount(target string) error {
return fmt.Errorf("unsupported")
}
func (mi *fakeMountInterface) List() ([]mount.MountPoint, error) {
return mi.mountPoints, nil
}
func (mi *fakeMountInterface) IsLikelyNotMountPoint(file string) (bool, error) {
return false, fmt.Errorf("unsupported")
}
func fakeContainerMgrMountInt() mount.Interface {
return &fakeMountInterface{
[]mount.MountPoint{
{
Device: "cgroup",
Type: "cgroup",
Opts: []string{"rw", "relatime", "cpuset"},
},
{
Device: "cgroup",
Type: "cgroup",
Opts: []string{"rw", "relatime", "cpu"},
},
{
Device: "cgroup",
Type: "cgroup",
Opts: []string{"rw", "relatime", "cpuacct"},
},
{
Device: "cgroup",
Type: "cgroup",
Opts: []string{"rw", "relatime", "memory"},
},
},
}
}
func TestCgroupMountValidationSuccess(t *testing.T) {
assert.Nil(t, validateSystemRequirements(fakeContainerMgrMountInt()))
}
func TestCgroupMountValidationMemoryMissing(t *testing.T) {
mountInt := &fakeMountInterface{
[]mount.MountPoint{
{
Device: "cgroup",
Type: "cgroup",
Opts: []string{"rw", "relatime", "cpuset"},
},
{
Device: "cgroup",
Type: "cgroup",
Opts: []string{"rw", "relatime", "cpu"},
},
{
Device: "cgroup",
Type: "cgroup",
Opts: []string{"rw", "relatime", "cpuacct"},
},
},
}
assert.Error(t, validateSystemRequirements(mountInt))
}
func TestCgroupMountValidationMultipleSubsytem(t *testing.T) {
mountInt := &fakeMountInterface{
[]mount.MountPoint{
{
Device: "cgroup",
Type: "cgroup",
Opts: []string{"rw", "relatime", "cpuset", "memory"},
},
{
Device: "cgroup",
Type: "cgroup",
Opts: []string{"rw", "relatime", "cpu"},
},
{
Device: "cgroup",
Type: "cgroup",
Opts: []string{"rw", "relatime", "cpuacct"},
},
},
}
assert.Nil(t, validateSystemRequirements(mountInt))
}

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/util/mount"
)
type unsupportedContainerManager struct {
@ -38,6 +39,6 @@ func (unsupportedContainerManager) SystemContainersLimit() api.ResourceList {
return api.ResourceList{}
}
func newContainerManager(cadvisorInterface cadvisor.Interface, dockerDaemonContainer, systemContainer, kubeletContainer string) (containerManager, error) {
func newContainerManager(mounter mount.Interface, cadvisorInterface cadvisor.Interface, dockerDaemonContainer, systemContainer, kubeletContainer string) (containerManager, error) {
return &unsupportedContainerManager{}, nil
}

View File

@ -362,7 +362,7 @@ func NewMainKubelet(
// Setup container manager, can fail if the devices hierarchy is not mounted
// (it is required by Docker however).
containerManager, err := newContainerManager(cadvisorInterface, dockerDaemonContainer, systemContainer, resourceContainer)
containerManager, err := newContainerManager(mounter, cadvisorInterface, dockerDaemonContainer, systemContainer, resourceContainer)
if err != nil {
return nil, fmt.Errorf("failed to create the Container Manager: %v", err)
}
@ -758,6 +758,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
// Move Kubelet to a container.
if kl.resourceContainer != "" {
// Fixme: I need to reside inside ContainerManager interface.
err := util.RunInResourceContainer(kl.resourceContainer)
if err != nil {
glog.Warningf("Failed to move Kubelet to container %q: %v", kl.resourceContainer, err)

View File

@ -131,7 +131,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
t: t,
}
kubelet.volumeManager = newVolumeManager()
kubelet.containerManager, _ = newContainerManager(mockCadvisor, "", "", "")
kubelet.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), mockCadvisor, "", "", "")
kubelet.networkConfigured = true
fakeClock := &util.FakeClock{Time: time.Now()}
kubelet.backOff = util.NewBackOff(time.Second, time.Minute)

View File

@ -51,7 +51,7 @@ func TestRunOnce(t *testing.T) {
diskSpaceManager: diskSpaceManager,
containerRuntime: fakeRuntime,
}
kb.containerManager, _ = newContainerManager(cadvisor, "", "", "")
kb.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), cadvisor, "", "", "")
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
if err := kb.setupDataDirs(); err != nil {