Move container manager into a separate package.

Inject container manager into Kubelet. This lets us stub out container
manager during integration testing.
pull/6/head
Vishnu kannan 2015-10-09 17:09:53 -07:00
parent 129dbc734c
commit 4ad3d6f5fe
16 changed files with 159 additions and 69 deletions

View File

@ -49,6 +49,7 @@ import (
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -216,7 +217,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
configFilePath := integration.MakeTempDirOrDie("config", testRootDir)
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
fakeDocker1.VersionInfo = docker.Env{"ApiVersion=1.20"}
cm := cm.NewStubContainerManager()
kcfg := kubeletapp.SimpleKubelet(
cl,
&fakeDocker1,
@ -238,7 +239,8 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
10*time.Second, /* MinimumGCAge */
3*time.Second, /* NodeStatusUpdateFrequency */
10*time.Second, /* SyncFrequency */
40 /* MaxPods */)
40, /* MaxPods */
cm)
kubeletapp.RunKubelet(kcfg)
// Kubelet (machine)
@ -270,7 +272,8 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
3*time.Second, /* NodeStatusUpdateFrequency */
10*time.Second, /* SyncFrequency */
40 /* MaxPods */)
40, /* MaxPods */
cm)
kubeletapp.RunKubelet(kcfg)
return apiServer.URL, configFilePath

View File

@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
@ -409,6 +410,7 @@ func (s *KubeletServer) UnsecuredKubeletConfig() (*KubeletConfig, error) {
ClusterDomain: s.ClusterDomain,
ConfigFile: s.Config,
ConfigureCBR0: s.ConfigureCBR0,
ContainerManager: nil,
ContainerRuntime: s.ContainerRuntime,
CPUCFSQuota: s.CPUCFSQuota,
DiskSpacePolicy: diskSpacePolicy,
@ -474,6 +476,7 @@ func (s *KubeletServer) UnsecuredKubeletConfig() (*KubeletConfig, error) {
// Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults
// will be ignored.
func (s *KubeletServer) Run(kcfg *KubeletConfig) error {
var err error
if kcfg == nil {
cfg, err := s.UnsecuredKubeletConfig()
if err != nil {
@ -498,11 +501,17 @@ func (s *KubeletServer) Run(kcfg *KubeletConfig) error {
}
if kcfg.CAdvisorInterface == nil {
ca, err := cadvisor.New(s.CAdvisorPort)
kcfg.CAdvisorInterface, err = cadvisor.New(s.CAdvisorPort)
if err != nil {
return err
}
}
if kcfg.ContainerManager == nil {
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, kcfg.CAdvisorInterface)
if err != nil {
return err
}
kcfg.CAdvisorInterface = ca
}
util.ReallyCrash = s.ReallyCrashForTesting
@ -670,7 +679,7 @@ func SimpleKubelet(client *client.Client,
osInterface kubecontainer.OSInterface,
fileCheckFrequency, httpCheckFrequency, minimumGCAge, nodeStatusUpdateFrequency, syncFrequency time.Duration,
maxPods int,
) *KubeletConfig {
containerManager cm.ContainerManager) *KubeletConfig {
imageGCPolicy := kubelet.ImageGCPolicy{
HighThresholdPercent: 90,
LowThresholdPercent: 80,
@ -686,6 +695,7 @@ func SimpleKubelet(client *client.Client,
CgroupRoot: "",
Cloud: cloud,
ConfigFile: configFilePath,
ContainerManager: containerManager,
ContainerRuntime: "docker",
CPUCFSQuota: false,
DiskSpacePolicy: diskSpacePolicy,
@ -724,8 +734,8 @@ func SimpleKubelet(client *client.Client,
SyncFrequency: syncFrequency,
SystemContainer: "",
TLSOptions: tlsOptions,
Writer: &io.StdWriter{},
VolumePlugins: volumePlugins,
Writer: &io.StdWriter{},
}
return &kcfg
}
@ -864,6 +874,7 @@ type KubeletConfig struct {
ClusterDomain string
ConfigFile string
ConfigureCBR0 bool
ContainerManager cm.ContainerManager
ContainerRuntime string
CPUCFSQuota bool
DiskSpacePolicy kubelet.DiskSpacePolicy
@ -1004,6 +1015,7 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
daemonEndpoints,
kc.OOMAdjuster,
kc.SerializeImagePulls,
kc.ContainerManager,
)
if err != nil {

View File

@ -27,6 +27,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubemark"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
@ -93,6 +94,7 @@ func main() {
if config.Morph == "kubelet" {
cadvisorInterface := new(cadvisor.Fake)
containerManager := cm.NewStubContainerManager()
fakeDockerClient := &dockertools.FakeDockerClient{}
fakeDockerClient.VersionInfo = docker.Env{"ApiVersion=1.18"}
@ -106,6 +108,7 @@ func main() {
fakeDockerClient,
config.KubeletPort,
config.KubeletReadOnlyPort,
containerManager,
)
hollowKubelet.Run()
}

View File

@ -36,6 +36,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/cm"
kconfig "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -151,7 +152,6 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat
if err != nil {
return k, pc, err
}
klet := k.(*kubelet.Kubelet)
s.kletLock.Lock()
@ -187,6 +187,11 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat
return err
}
kcfg.CAdvisorInterface = cAdvisorInterface
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, cAdvisorInterface)
if err != nil {
return err
}
go func() {
for ni := range nodeInfos {
// TODO(sttts): implement with MachineAllocable mechanism when https://github.com/kubernetes/kubernetes/issues/13984 is finished

View File

@ -0,0 +1,35 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cadvisor
import (
cadvisorApi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
)
func CapacityFromMachineInfo(info *cadvisorApi.MachineInfo) api.ResourceList {
c := api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(
int64(info.NumCores*1000),
resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(
info.MemoryCapacity,
resource.BinarySI),
}
return c
}

View File

@ -14,20 +14,26 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package cm
import (
"k8s.io/kubernetes/pkg/api"
)
// Manages the containers running on a machine.
type containerManager interface {
type ContainerManager interface {
// Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run.
Start() error
Start(NodeConfig) error
// Returns resources allocated to system containers in the machine.
// These containers include the system and Kubernetes services.
SystemContainersLimit() api.ResourceList
}
type NodeConfig struct {
DockerDaemonContainerName string
SystemContainerName string
KubeletContainerName string
}

View File

@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package cm
import (
"fmt"
@ -73,21 +73,15 @@ func newSystemContainer(containerName string) *systemContainer {
}
}
type nodeConfig struct {
dockerDaemonContainerName string
systemContainerName string
kubeletContainerName string
}
type containerManagerImpl struct {
cadvisorInterface cadvisor.Interface
mountUtil mount.Interface
nodeConfig
NodeConfig
// External containers being managed.
systemContainers []*systemContainer
}
var _ containerManager = &containerManagerImpl{}
var _ ContainerManager = &containerManagerImpl{}
// checks if the required cgroups subsystems are mounted.
// As of now, only 'cpu' and 'memory' are required.
@ -120,15 +114,11 @@ func validateSystemRequirements(mountUtil mount.Interface) error {
// TODO(vmarmol): Add limits to the system containers.
// Takes the absolute name of the specified containers.
// Empty container name disables use of the specified container.
func newContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, dockerDaemonContainerName, systemContainerName, kubeletContainerName string) (containerManager, error) {
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface) (ContainerManager, error) {
return &containerManagerImpl{
cadvisorInterface: cadvisorInterface,
mountUtil: mountUtil,
nodeConfig: nodeConfig{
dockerDaemonContainerName: dockerDaemonContainerName,
systemContainerName: systemContainerName,
kubeletContainerName: kubeletContainerName,
},
NodeConfig: NodeConfig{},
}, nil
}
@ -197,26 +187,26 @@ func (cm *containerManagerImpl) setupNode() error {
}
systemContainers := []*systemContainer{}
if cm.dockerDaemonContainerName != "" {
cont := newSystemContainer(cm.dockerDaemonContainerName)
if cm.DockerDaemonContainerName != "" {
cont := newSystemContainer(cm.DockerDaemonContainerName)
info, err := cm.cadvisorInterface.MachineInfo()
var capacity = api.ResourceList{}
if err != nil {
} else {
capacity = CapacityFromMachineInfo(info)
capacity = cadvisor.CapacityFromMachineInfo(info)
}
memoryLimit := (int64(capacity.Memory().Value() * DockerMemoryLimitThresholdPercent / 100))
if memoryLimit < MinDockerMemoryLimit {
glog.Warningf("Memory limit %d for container %s is too small, reset it to %d", memoryLimit, cm.dockerDaemonContainerName, MinDockerMemoryLimit)
glog.Warningf("Memory limit %d for container %s is too small, reset it to %d", memoryLimit, cm.DockerDaemonContainerName, MinDockerMemoryLimit)
memoryLimit = MinDockerMemoryLimit
}
glog.V(2).Infof("Configure resource-only container %s with memory limit: %d", cm.dockerDaemonContainerName, memoryLimit)
glog.V(2).Infof("Configure resource-only container %s with memory limit: %d", cm.DockerDaemonContainerName, memoryLimit)
dockerContainer := &fs.Manager{
Cgroups: &configs.Cgroup{
Name: cm.dockerDaemonContainerName,
Name: cm.DockerDaemonContainerName,
Memory: memoryLimit,
MemorySwap: -1,
AllowAllDevices: true,
@ -228,8 +218,8 @@ func (cm *containerManagerImpl) setupNode() error {
systemContainers = append(systemContainers, cont)
}
if cm.systemContainerName != "" {
if cm.systemContainerName == "/" {
if cm.SystemContainerName != "" {
if cm.SystemContainerName == "/" {
return fmt.Errorf("system container cannot be root (\"/\")")
}
@ -238,23 +228,25 @@ func (cm *containerManagerImpl) setupNode() error {
Name: "/",
},
}
manager := createManager(cm.systemContainerName)
manager := createManager(cm.SystemContainerName)
err := ensureSystemContainer(rootContainer, manager)
if err != nil {
return err
}
systemContainers = append(systemContainers, newSystemContainer(cm.systemContainerName))
systemContainers = append(systemContainers, newSystemContainer(cm.SystemContainerName))
}
if cm.kubeletContainerName != "" {
systemContainers = append(systemContainers, newSystemContainer(cm.kubeletContainerName))
if cm.KubeletContainerName != "" {
systemContainers = append(systemContainers, newSystemContainer(cm.KubeletContainerName))
}
cm.systemContainers = systemContainers
return nil
}
func (cm *containerManagerImpl) Start() error {
func (cm *containerManagerImpl) Start(nodeConfig NodeConfig) error {
cm.NodeConfig = nodeConfig
// Setup the node
if err := cm.setupNode(); err != nil {
return err

View File

@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package cm
import (
"fmt"

View File

@ -0,0 +1,39 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
)
type containerManagerStub struct{}
var _ ContainerManager = &containerManagerStub{}
func (cm *containerManagerStub) Start(_ NodeConfig) error {
glog.V(2).Infof("Starting stub container manager")
return nil
}
func (cm *containerManagerStub) SystemContainersLimit() api.ResourceList {
return api.ResourceList{}
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{}
}

View File

@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package cm
import (
"fmt"
@ -29,9 +29,9 @@ import (
type unsupportedContainerManager struct {
}
var _ containerManager = &unsupportedContainerManager{}
var _ ContainerManager = &unsupportedContainerManager{}
func (unsupportedContainerManager) Start() error {
func (unsupportedContainerManager) Start(_ NodeConfig) error {
return fmt.Errorf("Container Manager is unsupported in this build")
}
@ -39,6 +39,6 @@ func (unsupportedContainerManager) SystemContainersLimit() api.ResourceList {
return api.ResourceList{}
}
func newContainerManager(mounter mount.Interface, cadvisorInterface cadvisor.Interface, dockerDaemonContainer, systemContainer, kubeletContainer string) (containerManager, error) {
func NewContainerManager(mounter mount.Interface, cadvisorInterface cadvisor.Interface) (ContainerManager, error) {
return &unsupportedContainerManager{}, nil
}

View File

@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package cm
import (
"fmt"

View File

@ -48,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/fieldpath"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/envvars"
@ -195,7 +196,9 @@ func NewMainKubelet(
daemonEndpoints *api.NodeDaemonEndpoints,
oomAdjuster *oom.OOMAdjuster,
serializeImagePulls bool,
containerManager cm.ContainerManager,
) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
@ -302,6 +305,7 @@ func NewMainKubelet(
resolverConfig: resolverConfig,
cpuCFSQuota: cpuCFSQuota,
daemonEndpoints: daemonEndpoints,
containerManager: containerManager,
}
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
@ -390,12 +394,11 @@ func NewMainKubelet(
// Setup container manager, can fail if the devices hierarchy is not mounted
// (it is required by Docker however).
containerManager, err := newContainerManager(mounter, cadvisorInterface, dockerDaemonContainer, systemContainer, resourceContainer)
if err != nil {
return nil, fmt.Errorf("failed to create the Container Manager: %v", err)
klet.nodeConfig = cm.NodeConfig{
DockerDaemonContainerName: dockerDaemonContainer,
SystemContainerName: systemContainer,
KubeletContainerName: resourceContainer,
}
klet.containerManager = containerManager
klet.runtimeState.setRuntimeSync(time.Now())
klet.runner = klet.containerRuntime
@ -575,7 +578,8 @@ type Kubelet struct {
writer kubeio.Writer
// Manager of non-Runtime containers.
containerManager containerManager
containerManager cm.ContainerManager
nodeConfig cm.NodeConfig
// Whether or not kubelet should take responsibility for keeping cbr0 in
// the correct state.
@ -813,7 +817,7 @@ func (kl *Kubelet) preRun() error {
return fmt.Errorf("Failed to start CAdvisor %v", err)
}
if err := kl.containerManager.Start(); err != nil {
if err := kl.containerManager.Start(kl.nodeConfig); err != nil {
return fmt.Errorf("Failed to start ContainerManager %v", err)
}
@ -1971,7 +1975,7 @@ func (kl *Kubelet) hasInsufficientfFreeResources(pods []*api.Pod) (bool, bool) {
// TODO: Should we admit the pod when machine info is unavailable?
return false, false
}
capacity := CapacityFromMachineInfo(info)
capacity := cadvisor.CapacityFromMachineInfo(info)
_, notFittingCPU, notFittingMemory := predicates.CheckPodsExceedingFreeResources(pods, capacity)
return len(notFittingCPU) > 0, len(notFittingMemory) > 0
}
@ -2506,7 +2510,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Status.Capacity = CapacityFromMachineInfo(info)
node.Status.Capacity = cadvisor.CapacityFromMachineInfo(info)
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(
int64(kl.pods), resource.DecimalSI)
if node.Status.NodeInfo.BootID != "" &&

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
@ -56,7 +57,7 @@ func TestRunOnce(t *testing.T) {
diskSpaceManager: diskSpaceManager,
containerRuntime: fakeRuntime,
}
kb.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), cadvisor, "", "", "")
kb.containerManager = cm.NewStubContainerManager()
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
if err := kb.setupDataDirs(); err != nil {

View File

@ -19,26 +19,12 @@ package kubelet
import (
"fmt"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/capabilities"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/securitycontext"
)
func CapacityFromMachineInfo(info *cadvisorapi.MachineInfo) api.ResourceList {
c := api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(
int64(info.NumCores*1000),
resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(
info.MemoryCapacity,
resource.BinarySI),
}
return c
}
// Check whether we have the capabilities to run the specified pod.
func canRunPod(pod *api.Pod) error {
if pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork {

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/volume/empty_dir"
@ -41,6 +42,7 @@ func NewHollowKubelet(
cadvisorInterface cadvisor.Interface,
dockerClient dockertools.DockerInterface,
kubeletPort, kubeletReadOnlyPort int,
containerManager cm.ContainerManager,
) *HollowKubelet {
testRootDir := integration.MakeTempDirOrDie("hollow-kubelet.", "")
manifestFilePath := integration.MakeTempDirOrDie("manifest", testRootDir)
@ -69,6 +71,7 @@ func NewHollowKubelet(
10*time.Second, /* NodeStatusUpdateFrequency */
10*time.Second, /* SyncFrequency */
40, /* MaxPods */
containerManager,
),
}
}