Kubelet Refactoring

This refactor removes the legacy KubeletConfig object and adds a new
KubeletDeps object, which contains injected runtime objects and
separates them from static config. It also reduces NewMainKubelet to two
arguments: a KubeletConfiguration and a KubeletDeps.

Some mesos and kubemark code was affected by this change, and has been
modified accordingly.

And a few final notes:

KubeletDeps:
KubeletDeps will be a temporary bin for things we might consider
"injected dependencies", until we have a better dependency injection
story for the Kubelet. We will have to discuss this eventually.

RunOnce:
We will likely not pull new KubeletConfiguration from the API server
when in runonce mode, so it doesn't make sense to make this something
that can be configured centrally. We will leave it as a flag-only option
for now. Additionally, it is increasingly looking like nobody actually uses the
Kubelet's runonce mode anymore, so it may be a candidate for deprecation
and removal.
pull/6/head
Michael Taufen 2016-07-13 16:11:12 -07:00
parent b44b716965
commit f277205f4f
6 changed files with 630 additions and 889 deletions

View File

@ -49,12 +49,17 @@ type KubeletServer struct {
AuthPath util.StringFlag // Deprecated -- use KubeConfig instead
APIServerList []string // Deprecated -- use KubeConfig instead
RunOnce bool
// Insert a probability of random errors during calls to the master.
ChaosChance float64
// Crash immediately, rather than eating panics.
ReallyCrashForTesting bool
// TODO(mtaufen): It is increasingly looking like nobody actually uses the
// Kubelet's runonce mode anymore, so it may be a candidate
// for deprecation and removal.
// If runOnce is true, the Kubelet will check the API server once for pods,
// run those in addition to the pods specified by the local manifest, and exit.
RunOnce bool
}
// NewKubeletServer will create a new KubeletServer with default values.

View File

@ -38,7 +38,6 @@ import (
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/apis/componentconfig"
v1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
"k8s.io/kubernetes/pkg/capabilities"
@ -59,9 +58,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/server"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime"
@ -69,7 +65,7 @@ import (
"k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flock"
"k8s.io/kubernetes/pkg/util/io"
kubeio "k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
@ -77,22 +73,8 @@ import (
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
)
// bootstrapping interface for kubelet, targets the initialization protocol
type KubeletBootstrap interface {
BirthCry()
StartGarbageCollection()
ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool)
ListenAndServeReadOnly(address net.IP, port uint)
Run(<-chan kubetypes.PodUpdate)
RunOnce(<-chan kubetypes.PodUpdate) ([]kubelet.RunPodResult, error)
}
// create and initialize a Kubelet instance
type KubeletBuilder func(kc *KubeletConfig) (KubeletBootstrap, *config.PodConfig, error)
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
s := options.NewKubeletServer()
@ -124,188 +106,38 @@ HTTP server: The kubelet can also listen for HTTP and respond to a simple API
return cmd
}
// UnsecuredKubeletConfig returns a KubeletConfig suitable for being run, or an error if the server setup
// UnsecuredKubeletDeps returns a KubeletDeps suitable for being run, or an error if the server setup
// is not valid. It will not start any background processes, and does not include authentication/authorization
func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
hostNetworkSources, err := kubetypes.GetValidatedSources(s.HostNetworkSources)
if err != nil {
return nil, err
}
func UnsecuredKubeletDeps(s *options.KubeletServer) (*kubelet.KubeletDeps, error) {
hostPIDSources, err := kubetypes.GetValidatedSources(s.HostPIDSources)
if err != nil {
return nil, err
}
hostIPCSources, err := kubetypes.GetValidatedSources(s.HostIPCSources)
// Initialize the TLS Options
tlsOptions, err := InitializeTLS(&s.KubeletConfiguration)
if err != nil {
return nil, err
}
mounter := mount.New()
var writer io.Writer = &io.StdWriter{}
var writer kubeio.Writer = &kubeio.StdWriter{}
if s.Containerized {
glog.V(2).Info("Running kubelet in containerized mode (experimental)")
mounter = mount.NewNsenterMounter()
writer = &io.NsenterWriter{}
writer = &kubeio.NsenterWriter{}
}
tlsOptions, err := InitializeTLS(s)
if err != nil {
return nil, err
}
var dockerExecHandler dockertools.ExecHandler
switch s.DockerExecHandlerName {
case "native":
dockerExecHandler = &dockertools.NativeExecHandler{}
case "nsenter":
dockerExecHandler = &dockertools.NsenterExecHandler{}
default:
glog.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName)
dockerExecHandler = &dockertools.NativeExecHandler{}
}
imageGCPolicy := images.ImageGCPolicy{
MinAge: s.ImageMinimumGCAge.Duration,
HighThresholdPercent: int(s.ImageGCHighThresholdPercent),
LowThresholdPercent: int(s.ImageGCLowThresholdPercent),
}
diskSpacePolicy := kubelet.DiskSpacePolicy{
DockerFreeDiskMB: int(s.LowDiskSpaceThresholdMB),
RootFreeDiskMB: int(s.LowDiskSpaceThresholdMB),
}
manifestURLHeader := make(http.Header)
if s.ManifestURLHeader != "" {
pieces := strings.Split(s.ManifestURLHeader, ":")
if len(pieces) != 2 {
return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader)
}
manifestURLHeader.Set(pieces[0], pieces[1])
}
reservation, err := parseReservation(s.KubeReserved, s.SystemReserved)
if err != nil {
return nil, err
}
thresholds, err := eviction.ParseThresholdConfig(s.EvictionHard, s.EvictionSoft, s.EvictionSoftGracePeriod, s.EvictionMinimumReclaim)
if err != nil {
return nil, err
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: s.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(s.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
}
if s.MakeIPTablesUtilChains {
if s.IPTablesMasqueradeBit > 31 || s.IPTablesMasqueradeBit < 0 {
return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
}
if s.IPTablesDropBit > 31 || s.IPTablesDropBit < 0 {
return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
}
if s.IPTablesDropBit == s.IPTablesMasqueradeBit {
return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
}
}
return &KubeletConfig{
Address: net.ParseIP(s.Address),
AllowPrivileged: s.AllowPrivileged,
Auth: nil, // default does not enforce auth[nz]
CAdvisorInterface: nil, // launches background processes, not set here
VolumeStatsAggPeriod: s.VolumeStatsAggPeriod.Duration,
CgroupRoot: s.CgroupRoot,
Cloud: nil, // cloud provider might start background processes
ClusterDNS: net.ParseIP(s.ClusterDNS),
ClusterDomain: s.ClusterDomain,
ConfigFile: s.PodManifestPath,
ConfigureCBR0: s.ConfigureCBR0,
ContainerManager: nil,
ContainerRuntime: s.ContainerRuntime,
RemoteRuntimeEndpoint: s.RemoteRuntimeEndpoint,
RemoteImageEndpoint: s.RemoteImageEndpoint,
RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration,
CPUCFSQuota: s.CPUCFSQuota,
DiskSpacePolicy: diskSpacePolicy,
DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt.
RuntimeCgroups: s.RuntimeCgroups,
DockerExecHandler: dockerExecHandler,
EnableControllerAttachDetach: s.EnableControllerAttachDetach,
EnableCustomMetrics: s.EnableCustomMetrics,
EnableDebuggingHandlers: s.EnableDebuggingHandlers,
CgroupsPerQOS: s.CgroupsPerQOS,
EnableServer: s.EnableServer,
EventBurst: int(s.EventBurst),
EventRecordQPS: float32(s.EventRecordQPS),
FileCheckFrequency: s.FileCheckFrequency.Duration,
HostnameOverride: s.HostnameOverride,
HostNetworkSources: hostNetworkSources,
HostPIDSources: hostPIDSources,
HostIPCSources: hostIPCSources,
HTTPCheckFrequency: s.HTTPCheckFrequency.Duration,
ImageGCPolicy: imageGCPolicy,
KubeClient: nil,
ManifestURL: s.ManifestURL,
ManifestURLHeader: manifestURLHeader,
MasterServiceNamespace: s.MasterServiceNamespace,
MaxContainerCount: int(s.MaxContainerCount),
MaxOpenFiles: uint64(s.MaxOpenFiles),
MaxPerPodContainerCount: int(s.MaxPerPodContainerCount),
MaxPods: int(s.MaxPods),
NvidiaGPUs: int(s.NvidiaGPUs),
MinimumGCAge: s.MinimumGCAge.Duration,
Mounter: mounter,
NetworkPluginName: s.NetworkPluginName,
NetworkPluginMTU: int(s.NetworkPluginMTU),
NetworkPlugins: ProbeNetworkPlugins(s.NetworkPluginDir),
NodeLabels: s.NodeLabels,
NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency.Duration,
NonMasqueradeCIDR: s.NonMasqueradeCIDR,
OOMAdjuster: oom.NewOOMAdjuster(),
OSInterface: kubecontainer.RealOS{},
PodCIDR: s.PodCIDR,
ReconcileCIDR: s.ReconcileCIDR,
PodInfraContainerImage: s.PodInfraContainerImage,
Port: uint(s.Port),
ReadOnlyPort: uint(s.ReadOnlyPort),
RegisterNode: s.RegisterNode,
RegisterSchedulable: s.RegisterSchedulable,
RegistryBurst: int(s.RegistryBurst),
RegistryPullQPS: float64(s.RegistryPullQPS),
ResolverConfig: s.ResolverConfig,
Reservation: *reservation,
KubeletCgroups: s.KubeletCgroups,
RktPath: s.RktPath,
RktAPIEndpoint: s.RktAPIEndpoint,
RktStage1Image: s.RktStage1Image,
RootDirectory: s.RootDirectory,
SeccompProfileRoot: s.SeccompProfileRoot,
Runonce: s.RunOnce,
SerializeImagePulls: s.SerializeImagePulls,
StandaloneMode: (len(s.APIServerList) == 0),
StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout.Duration,
SyncFrequency: s.SyncFrequency.Duration,
AllowedUnsafeSysctls: s.AllowedUnsafeSysctls,
SystemCgroups: s.SystemCgroups,
TLSOptions: tlsOptions,
Writer: writer,
VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir),
OutOfDiskTransitionFrequency: s.OutOfDiskTransitionFrequency.Duration,
HairpinMode: s.HairpinMode,
BabysitDaemons: s.BabysitDaemons,
ExperimentalFlannelOverlay: s.ExperimentalFlannelOverlay,
NodeIP: net.ParseIP(s.NodeIP),
EvictionConfig: evictionConfig,
PodsPerCore: int(s.PodsPerCore),
ProtectKernelDefaults: s.ProtectKernelDefaults,
MakeIPTablesUtilChains: s.MakeIPTablesUtilChains,
iptablesMasqueradeBit: int(s.IPTablesMasqueradeBit),
iptablesDropBit: int(s.IPTablesDropBit),
return &kubelet.KubeletDeps{
Auth: nil, // default does not enforce auth[nz]
CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
Cloud: nil, // cloud provider might start background processes
ContainerManager: nil,
DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt.
KubeClient: nil,
Mounter: mounter,
NetworkPlugins: ProbeNetworkPlugins(s.NetworkPluginDir),
OOMAdjuster: oom.NewOOMAdjuster(),
OSInterface: kubecontainer.RealOS{},
Writer: writer,
VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir),
TLSOptions: tlsOptions,
}, nil
}
@ -322,7 +154,7 @@ func getKubeClient(s *options.KubeletServer) (*clientset.Clientset, error) {
}
// Tries to download the kubelet-<node-name> configmap from "kube-system" namespace via the API server and returns a JSON string or error
func getRemoteKubeletConfig(s *options.KubeletServer, kcfg *KubeletConfig) (string, error) {
func getRemoteKubeletConfig(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (string, error) {
// TODO(mtaufen): should probably cache clientset and pass into this function rather than regenerate on every request
kubeClient, err := getKubeClient(s)
if err != nil {
@ -333,8 +165,8 @@ func getRemoteKubeletConfig(s *options.KubeletServer, kcfg *KubeletConfig) (stri
var nodename string
hostname := nodeutil.GetHostname(s.HostnameOverride)
if kcfg != nil && kcfg.Cloud != nil {
instances, ok := kcfg.Cloud.Instances()
if kubeDeps != nil && kubeDeps.Cloud != nil {
instances, ok := kubeDeps.Cloud.Instances()
if !ok {
err = fmt.Errorf("failed to get instances from cloud provider, can't determine nodename.")
return nil, err
@ -420,13 +252,14 @@ func initKubeletConfigSync(s *options.KubeletServer) (*componentconfig.KubeletCo
}
}
// Run runs the specified KubeletServer for the given KubeletConfig. This should never exit.
// The kcfg argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults
// will be ignored.
func Run(s *options.KubeletServer, kcfg *KubeletConfig) error {
if err := run(s, kcfg); err != nil {
// Run runs the specified KubeletServer with the given KubeletDeps. This should never exit.
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the KubeletDeps object and a default one will
// not be generated.
func Run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) error {
if err := run(s, kubeDeps); err != nil {
return fmt.Errorf("failed to run Kubelet: %v", err)
}
return nil
}
@ -456,7 +289,10 @@ func initConfigz(kc *componentconfig.KubeletConfiguration) (*configz.Config, err
return cz, err
}
func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) {
// TODO: this should be replaced by a --standalone flag
standaloneMode := (len(s.APIServerList) == 0)
if s.ExitOnLockContention && s.LockFilePath == "" {
return errors.New("cannot exit on lock file contention: no lock file specified")
}
@ -477,37 +313,31 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
// Register current configuration with /configz endpoint
cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration)
if utilconfig.DefaultFeatureGate.DynamicKubeletConfig() {
// Look for config on the API server. If it exists, replace s.KubeletConfiguration
// with it and continue. initKubeletConfigSync also starts the background thread that checks for new config.
if kcfg == nil {
if utilconfig.DefaultFeatureGate.DynamicKubeletConfig() {
// Look for config on the API server. If it exists, replace s.KubeletConfiguration
// with it and continue. initKubeletConfigSync also starts the background thread that checks for new config.
// Don't do dynamic Kubelet configuration in runonce mode
if s.RunOnce == false {
// For now we only do dynamic config when kcfg is passed as nil, because we don't want to disturb
// any special relationship between the values in s (KubeletServer) and kcfg (KubeletConfig).
remoteKC, err := initKubeletConfigSync(s)
if err == nil {
// Update s (KubeletServer) with new config from API server
s.KubeletConfiguration = *remoteKC
// Ensure that /configz is up to date with the new config
if cfgzErr != nil {
glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr)
} else {
setConfigz(cfgz, &s.KubeletConfiguration)
}
// Don't do dynamic Kubelet configuration in runonce mode
if s.RunOnce == false {
remoteKC, err := initKubeletConfigSync(s)
if err == nil {
// Update s (KubeletServer) with new config from API server
s.KubeletConfiguration = *remoteKC
// Ensure that /configz is up to date with the new config
if cfgzErr != nil {
glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr)
} else {
setConfigz(cfgz, &s.KubeletConfiguration)
}
}
}
}
if kubeDeps == nil {
var kubeClient, eventClient *clientset.Clientset
var autoDetectCloudProvider bool
var cloud cloudprovider.Interface
if s.CloudProvider == v1alpha1.AutoDetectCloudProvider {
autoDetectCloudProvider = true
} else {
if s.CloudProvider != v1alpha1.AutoDetectCloudProvider {
cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return err
@ -532,7 +362,6 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
clientConfig, err := CreateAPIServerClientConfig(s)
if err == nil {
kubeClient, err = clientset.NewForConfig(clientConfig)
// make a separate client for events
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
@ -543,43 +372,40 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
if s.RequireKubeConfig {
return fmt.Errorf("invalid kubeconfig: %v", err)
}
// TODO: this should be replaced by a --standalone flag
if len(s.APIServerList) > 0 {
if standaloneMode {
glog.Warningf("No API client: %v", err)
}
}
cfg, err := UnsecuredKubeletConfig(s)
kubeDeps, err = UnsecuredKubeletDeps(s)
if err != nil {
return err
}
kcfg = cfg
kcfg.AutoDetectCloudProvider = autoDetectCloudProvider
kcfg.Cloud = cloud
kcfg.KubeClient = kubeClient
kcfg.EventClient = eventClient
kubeDeps.Cloud = cloud
kubeDeps.KubeClient = kubeClient
kubeDeps.EventClient = eventClient
}
if kcfg.CAdvisorInterface == nil {
kcfg.CAdvisorInterface, err = cadvisor.New(uint(s.CAdvisorPort), kcfg.ContainerRuntime)
if kubeDeps.CAdvisorInterface == nil {
kubeDeps.CAdvisorInterface, err = cadvisor.New(uint(s.CAdvisorPort), s.ContainerRuntime)
if err != nil {
return err
}
}
if kcfg.ContainerManager == nil {
if kcfg.SystemCgroups != "" && kcfg.CgroupRoot == "" {
if kubeDeps.ContainerManager == nil {
if s.SystemCgroups != "" && s.CgroupRoot == "" {
return fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified")
}
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, kcfg.CAdvisorInterface, cm.NodeConfig{
RuntimeCgroupsName: kcfg.RuntimeCgroups,
SystemCgroupsName: kcfg.SystemCgroups,
KubeletCgroupsName: kcfg.KubeletCgroups,
ContainerRuntime: kcfg.ContainerRuntime,
CgroupsPerQOS: kcfg.CgroupsPerQOS,
CgroupRoot: kcfg.CgroupRoot,
ProtectKernelDefaults: kcfg.ProtectKernelDefaults,
kubeDeps.ContainerManager, err = cm.NewContainerManager(kubeDeps.Mounter, kubeDeps.CAdvisorInterface, cm.NodeConfig{
RuntimeCgroupsName: s.RuntimeCgroups,
SystemCgroupsName: s.SystemCgroups,
KubeletCgroupsName: s.KubeletCgroups,
ContainerRuntime: s.ContainerRuntime,
CgroupsPerQOS: s.CgroupsPerQOS,
CgroupRoot: s.CgroupRoot,
ProtectKernelDefaults: s.ProtectKernelDefaults,
})
if err != nil {
return err
@ -595,12 +421,12 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) {
rand.Seed(time.Now().UTC().UnixNano())
// TODO(vmarmol): Do this through container config.
oomAdjuster := kcfg.OOMAdjuster
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
glog.Warning(err)
}
if err := RunKubelet(kcfg); err != nil {
if err := RunKubelet(&s.KubeletConfiguration, kubeDeps, s.RunOnce, standaloneMode); err != nil {
return err
}
@ -646,15 +472,15 @@ func getNodeName(cloud cloudprovider.Interface, hostname string) (string, error)
// InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
// certificate and key file are generated. Returns a configured server.TLSOptions object.
func InitializeTLS(s *options.KubeletServer) (*server.TLSOptions, error) {
if s.TLSCertFile == "" && s.TLSPrivateKeyFile == "" {
s.TLSCertFile = path.Join(s.CertDirectory, "kubelet.crt")
s.TLSPrivateKeyFile = path.Join(s.CertDirectory, "kubelet.key")
if !crypto.FoundCertOrKey(s.TLSCertFile, s.TLSPrivateKeyFile) {
if err := crypto.GenerateSelfSignedCert(nodeutil.GetHostname(s.HostnameOverride), s.TLSCertFile, s.TLSPrivateKeyFile, nil, nil); err != nil {
func InitializeTLS(kc *componentconfig.KubeletConfiguration) (*server.TLSOptions, error) {
if kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
kc.TLSCertFile = path.Join(kc.CertDirectory, "kubelet.crt")
kc.TLSPrivateKeyFile = path.Join(kc.CertDirectory, "kubelet.key")
if !crypto.FoundCertOrKey(kc.TLSCertFile, kc.TLSPrivateKeyFile) {
if err := crypto.GenerateSelfSignedCert(nodeutil.GetHostname(kc.HostnameOverride), kc.TLSCertFile, kc.TLSPrivateKeyFile, nil, nil); err != nil {
return nil, fmt.Errorf("unable to generate self signed cert: %v", err)
}
glog.V(4).Infof("Using self-signed cert (%s, %s)", s.TLSCertFile, s.TLSPrivateKeyFile)
glog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile)
}
}
tlsOptions := &server.TLSOptions{
@ -666,8 +492,8 @@ func InitializeTLS(s *options.KubeletServer) (*server.TLSOptions, error) {
// Populate PeerCertificates in requests, but don't yet reject connections without certificates.
ClientAuth: tls.RequestClientCert,
},
CertFile: s.TLSCertFile,
KeyFile: s.TLSPrivateKeyFile,
CertFile: kc.TLSCertFile,
KeyFile: kc.TLSPrivateKeyFile,
}
return tlsOptions, nil
}
@ -778,151 +604,79 @@ func addChaosToClientConfig(s *options.KubeletServer, config *restclient.Config)
}
}
// SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an API Client.
// Under the hood it calls RunKubelet (below)
func SimpleKubelet(client *clientset.Clientset,
dockerClient dockertools.DockerInterface,
hostname, rootDir, manifestURL, address string,
port uint,
readOnlyPort uint,
masterServiceNamespace string,
volumePlugins []volume.VolumePlugin,
tlsOptions *server.TLSOptions,
cadvisorInterface cadvisor.Interface,
configFilePath string,
cloud cloudprovider.Interface,
osInterface kubecontainer.OSInterface,
fileCheckFrequency, httpCheckFrequency, minimumGCAge, nodeStatusUpdateFrequency, syncFrequency, outOfDiskTransitionFrequency, evictionPressureTransitionPeriod time.Duration,
maxPods int, podsPerCore int,
containerManager cm.ContainerManager, clusterDNS net.IP) *KubeletConfig {
imageGCPolicy := images.ImageGCPolicy{
HighThresholdPercent: 90,
LowThresholdPercent: 80,
}
diskSpacePolicy := kubelet.DiskSpacePolicy{
DockerFreeDiskMB: 256,
RootFreeDiskMB: 256,
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: evictionPressureTransitionPeriod,
}
c := componentconfig.KubeletConfiguration{}
kcfg := KubeletConfig{
Address: net.ParseIP(address),
CAdvisorInterface: cadvisorInterface,
VolumeStatsAggPeriod: time.Minute,
CgroupRoot: "",
Cloud: cloud,
ClusterDNS: clusterDNS,
ConfigFile: configFilePath,
ContainerManager: containerManager,
ContainerRuntime: "docker",
CPUCFSQuota: true,
DiskSpacePolicy: diskSpacePolicy,
DockerClient: dockerClient,
RuntimeCgroups: "",
DockerExecHandler: &dockertools.NativeExecHandler{},
EnableControllerAttachDetach: false,
EnableCustomMetrics: false,
EnableDebuggingHandlers: true,
EnableServer: true,
CgroupsPerQOS: false,
FileCheckFrequency: fileCheckFrequency,
// Since this kubelet runs with --configure-cbr0=false, it needs to use
// hairpin-veth to allow hairpin packets. Note that this deviates from
// what the "real" kubelet currently does, because there's no way to
// set promiscuous mode on docker0.
HairpinMode: componentconfig.HairpinVeth,
HostnameOverride: hostname,
HTTPCheckFrequency: httpCheckFrequency,
ImageGCPolicy: imageGCPolicy,
KubeClient: client,
ManifestURL: manifestURL,
MasterServiceNamespace: masterServiceNamespace,
MaxContainerCount: 100,
MaxOpenFiles: 1024,
MaxPerPodContainerCount: 2,
MaxPods: maxPods,
NvidiaGPUs: 0,
MinimumGCAge: minimumGCAge,
Mounter: mount.New(),
NodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
OOMAdjuster: oom.NewFakeOOMAdjuster(),
OSInterface: osInterface,
PodInfraContainerImage: c.PodInfraContainerImage,
Port: port,
ReadOnlyPort: readOnlyPort,
RegisterNode: true,
RegisterSchedulable: true,
RegistryBurst: 10,
RegistryPullQPS: 5.0,
ResolverConfig: kubetypes.ResolvConfDefault,
KubeletCgroups: "/kubelet",
RootDirectory: rootDir,
SerializeImagePulls: true,
SyncFrequency: syncFrequency,
SystemCgroups: "",
TLSOptions: tlsOptions,
VolumePlugins: volumePlugins,
Writer: &io.StdWriter{},
OutOfDiskTransitionFrequency: outOfDiskTransitionFrequency,
EvictionConfig: evictionConfig,
PodsPerCore: podsPerCore,
ProtectKernelDefaults: false,
}
return &kcfg
}
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
// 1 Integration tests
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kcfg *KubeletConfig) error {
kcfg.Hostname = nodeutil.GetHostname(kcfg.HostnameOverride)
if len(kcfg.NodeName) == 0 {
nodeName, err := getNodeName(kcfg.Cloud, kcfg.Hostname)
if err != nil {
return err
}
kcfg.NodeName = nodeName
func RunKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error {
hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride)
// Query the cloud provider for our node name, default to hostname if kcfg.Cloud == nil
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
if err != nil {
return err
}
eventBroadcaster := record.NewBroadcaster()
kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.NodeName})
kubeDeps.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: nodeName})
eventBroadcaster.StartLogging(glog.V(3).Infof)
if kcfg.EventClient != nil {
if kubeDeps.EventClient != nil {
glog.V(4).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kcfg.EventClient.Events("")})
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
} else {
glog.Warning("No api server defined - no events will be sent to API server.")
}
privilegedSources := capabilities.PrivilegedSources{
HostNetworkSources: kcfg.HostNetworkSources,
HostPIDSources: kcfg.HostPIDSources,
HostIPCSources: kcfg.HostIPCSources,
// TODO(mtaufen): I moved the validation of these fields here, from UnsecuredKubeletConfig,
// so that I could remove the associated fields from KubeletConfig. I would
// prefer this to be done as part of an independent validation step on the
// KubeletConfiguration. But as far as I can tell, we don't have an explicit
// place for validation of the KubeletConfiguration yet.
hostNetworkSources, err := kubetypes.GetValidatedSources(kubeCfg.HostNetworkSources)
if err != nil {
return err
}
capabilities.Setup(kcfg.AllowPrivileged, privilegedSources, 0)
credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory)
glog.V(2).Infof("Using root directory: %v", kcfg.RootDirectory)
hostPIDSources, err := kubetypes.GetValidatedSources(kubeCfg.HostPIDSources)
if err != nil {
return err
}
builder := kcfg.Builder
hostIPCSources, err := kubetypes.GetValidatedSources(kubeCfg.HostIPCSources)
if err != nil {
return err
}
privilegedSources := capabilities.PrivilegedSources{
HostNetworkSources: hostNetworkSources,
HostPIDSources: hostPIDSources,
HostIPCSources: hostIPCSources,
}
capabilities.Setup(kubeCfg.AllowPrivileged, privilegedSources, 0)
credentialprovider.SetPreferredDockercfgPath(kubeCfg.RootDirectory)
glog.V(2).Infof("Using root directory: %v", kubeCfg.RootDirectory)
builder := kubeDeps.Builder
if builder == nil {
builder = CreateAndInitKubelet
}
if kcfg.OSInterface == nil {
kcfg.OSInterface = kubecontainer.RealOS{}
if kubeDeps.OSInterface == nil {
kubeDeps.OSInterface = kubecontainer.RealOS{}
}
k, podCfg, err := builder(kcfg)
k, err := builder(kubeCfg, kubeDeps, standaloneMode)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}
rlimit.RlimitNumFiles(kcfg.MaxOpenFiles)
// NewMainKubelet should have set up a pod source config if one didn't exist
// when the builder was run. This is just a precaution.
if kubeDeps.PodConfig == nil {
return fmt.Errorf("failed to create kubelet, pod source config was nil!")
}
podCfg := kubeDeps.PodConfig
rlimit.RlimitNumFiles(uint64(kubeCfg.MaxOpenFiles))
// TODO(dawnchen): remove this once we deprecated old debian containervm images.
// This is a workaround for issue: https://github.com/opencontainers/runc/issues/726
@ -961,313 +715,52 @@ func RunKubelet(kcfg *KubeletConfig) error {
}
// process pods and exit.
if kcfg.Runonce {
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
glog.Infof("Started kubelet %s as runonce", version.Get().String())
} else {
startKubelet(k, podCfg, kcfg)
err := startKubelet(k, podCfg, kubeCfg, kubeDeps)
if err != nil {
return err
}
glog.Infof("Started kubelet %s", version.Get().String())
}
return nil
}
func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) {
func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) error {
// start the kubelet
go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)
// start the kubelet server
if kc.EnableServer {
if kubeCfg.EnableServer {
go wait.Until(func() {
k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.Auth, kc.EnableDebuggingHandlers)
k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers)
}, 0, wait.NeverStop)
}
if kc.ReadOnlyPort > 0 {
if kubeCfg.ReadOnlyPort > 0 {
go wait.Until(func() {
k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort)
k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}, 0, wait.NeverStop)
}
return nil
}
func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder)
// define file config source
if kc.ConfigFile != "" {
glog.Infof("Adding manifest file: %v", kc.ConfigFile)
config.NewSourceFile(kc.ConfigFile, kc.NodeName, kc.FileCheckFrequency, cfg.Channel(kubetypes.FileSource))
}
// define url config source
if kc.ManifestURL != "" {
glog.Infof("Adding manifest url %q with HTTP header %v", kc.ManifestURL, kc.ManifestURLHeader)
config.NewSourceURL(kc.ManifestURL, kc.ManifestURLHeader, kc.NodeName, kc.HTTPCheckFrequency, cfg.Channel(kubetypes.HTTPSource))
}
if kc.KubeClient != nil {
glog.Infof("Watching apiserver")
config.NewSourceApiserver(kc.KubeClient, kc.NodeName, cfg.Channel(kubetypes.ApiserverSource))
}
return cfg
}
// KubeletConfig is all of the parameters necessary for running a kubelet.
// TODO: This should probably be merged with KubeletServer. The extra object is a consequence of refactoring.
type KubeletConfig struct {
Address net.IP
AllowPrivileged bool
Auth server.AuthInterface
AutoDetectCloudProvider bool
Builder KubeletBuilder
CAdvisorInterface cadvisor.Interface
VolumeStatsAggPeriod time.Duration
CgroupRoot string
Cloud cloudprovider.Interface
ClusterDNS net.IP
ClusterDomain string
ConfigFile string
ConfigureCBR0 bool
ContainerManager cm.ContainerManager
ContainerRuntime string
RemoteRuntimeEndpoint string
RemoteImageEndpoint string
RuntimeRequestTimeout time.Duration
CPUCFSQuota bool
DiskSpacePolicy kubelet.DiskSpacePolicy
DockerClient dockertools.DockerInterface
RuntimeCgroups string
DockerExecHandler dockertools.ExecHandler
EnableControllerAttachDetach bool
EnableCustomMetrics bool
EnableDebuggingHandlers bool
CgroupsPerQOS bool
EnableServer bool
EventClient *clientset.Clientset
EventBurst int
EventRecordQPS float32
FileCheckFrequency time.Duration
Hostname string
HostnameOverride string
HostNetworkSources []string
HostPIDSources []string
HostIPCSources []string
HTTPCheckFrequency time.Duration
ImageGCPolicy images.ImageGCPolicy
KubeClient *clientset.Clientset
ManifestURL string
ManifestURLHeader http.Header
MasterServiceNamespace string
MaxContainerCount int
MaxOpenFiles uint64
MaxPerPodContainerCount int
MaxPods int
MinimumGCAge time.Duration
Mounter mount.Interface
NetworkPluginName string
NetworkPluginMTU int
NetworkPlugins []network.NetworkPlugin
NodeName string
NodeLabels map[string]string
NodeStatusUpdateFrequency time.Duration
NonMasqueradeCIDR string
NvidiaGPUs int
OOMAdjuster *oom.OOMAdjuster
OSInterface kubecontainer.OSInterface
PodCIDR string
PodsPerCore int
ReconcileCIDR bool
PodConfig *config.PodConfig
PodInfraContainerImage string
Port uint
ReadOnlyPort uint
Recorder record.EventRecorder
RegisterNode bool
RegisterSchedulable bool
RegistryBurst int
RegistryPullQPS float64
Reservation kubetypes.Reservation
ResolverConfig string
KubeletCgroups string
RktPath string
RktAPIEndpoint string
RktStage1Image string
RootDirectory string
Runonce bool
SeccompProfileRoot string
SerializeImagePulls bool
StandaloneMode bool
StreamingConnectionIdleTimeout time.Duration
SyncFrequency time.Duration
AllowedUnsafeSysctls []string
SystemCgroups string
TLSOptions *server.TLSOptions
Writer io.Writer
VolumePlugins []volume.VolumePlugin
OutOfDiskTransitionFrequency time.Duration
EvictionConfig eviction.Config
ExperimentalFlannelOverlay bool
NodeIP net.IP
ContainerRuntimeOptions []kubecontainer.Option
HairpinMode string
BabysitDaemons bool
Options []kubelet.Option
ProtectKernelDefaults bool
MakeIPTablesUtilChains bool
iptablesMasqueradeBit int
iptablesDropBit int
}
func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {
func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool) (k kubelet.KubeletBootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
// TODO: KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods
// used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing
// a nil pointer to it when what we really want is a nil interface.
var kubeClient clientset.Interface
if kc.KubeClient != nil {
kubeClient = kc.KubeClient
// TODO: remove this when we've refactored kubelet to only use clientset.
}
gcPolicy := kubecontainer.ContainerGCPolicy{
MinAge: kc.MinimumGCAge,
MaxPerPodContainer: kc.MaxPerPodContainerCount,
MaxContainers: kc.MaxContainerCount,
}
daemonEndpoints := &api.NodeDaemonEndpoints{
KubeletEndpoint: api.DaemonEndpoint{Port: int32(kc.Port)},
}
pc = kc.PodConfig
if pc == nil {
pc = makePodSourceConfig(kc)
}
k, err = kubelet.NewMainKubelet(
kc.Hostname,
kc.NodeName,
kc.DockerClient,
kubeClient,
kc.RootDirectory,
kc.SeccompProfileRoot,
kc.PodInfraContainerImage,
kc.SyncFrequency,
float32(kc.RegistryPullQPS),
kc.RegistryBurst,
kc.EventRecordQPS,
kc.EventBurst,
gcPolicy,
pc.SeenAllSources,
kc.RegisterNode,
kc.RegisterSchedulable,
kc.StandaloneMode,
kc.ClusterDomain,
kc.ClusterDNS,
kc.MasterServiceNamespace,
kc.VolumePlugins,
kc.NetworkPlugins,
kc.NetworkPluginName,
kc.NetworkPluginMTU,
kc.StreamingConnectionIdleTimeout,
kc.Recorder,
kc.CAdvisorInterface,
kc.ImageGCPolicy,
kc.DiskSpacePolicy,
kc.Cloud,
kc.AutoDetectCloudProvider,
kc.NodeLabels,
kc.NodeStatusUpdateFrequency,
kc.OSInterface,
kc.CgroupsPerQOS,
kc.CgroupRoot,
kc.ContainerRuntime,
kc.RemoteRuntimeEndpoint,
kc.RemoteImageEndpoint,
kc.RuntimeRequestTimeout,
kc.RktPath,
kc.RktAPIEndpoint,
kc.RktStage1Image,
kc.Mounter,
kc.Writer,
kc.ConfigureCBR0,
kc.NonMasqueradeCIDR,
kc.PodCIDR,
kc.ReconcileCIDR,
kc.MaxPods,
kc.PodsPerCore,
kc.NvidiaGPUs,
kc.DockerExecHandler,
kc.ResolverConfig,
kc.CPUCFSQuota,
daemonEndpoints,
kc.OOMAdjuster,
kc.SerializeImagePulls,
kc.ContainerManager,
kc.OutOfDiskTransitionFrequency,
kc.ExperimentalFlannelOverlay,
kc.NodeIP,
kc.Reservation,
kc.EnableCustomMetrics,
kc.VolumeStatsAggPeriod,
kc.ContainerRuntimeOptions,
kc.HairpinMode,
kc.BabysitDaemons,
kc.EvictionConfig,
kc.Options,
kc.EnableControllerAttachDetach,
kc.MakeIPTablesUtilChains,
kc.iptablesMasqueradeBit,
kc.iptablesDropBit,
kc.AllowedUnsafeSysctls,
)
k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, standaloneMode)
if err != nil {
return nil, nil, err
return nil, err
}
k.BirthCry()
k.StartGarbageCollection()
return k, pc, nil
}
func parseReservation(kubeReserved, systemReserved utilconfig.ConfigurationMap) (*kubetypes.Reservation, error) {
reservation := new(kubetypes.Reservation)
rl, err := parseResourceList(kubeReserved)
if err != nil {
return nil, err
}
reservation.Kubernetes = rl
rl, err = parseResourceList(systemReserved)
if err != nil {
return nil, err
}
reservation.System = rl
return reservation, nil
}
func parseResourceList(m utilconfig.ConfigurationMap) (api.ResourceList, error) {
rl := make(api.ResourceList)
for k, v := range m {
switch api.ResourceName(k) {
// Only CPU and memory resources are supported.
case api.ResourceCPU, api.ResourceMemory:
q, err := resource.ParseQuantity(v)
if err != nil {
return nil, err
}
if q.Sign() == -1 {
return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
}
rl[api.ResourceName(k)] = q
default:
return nil, fmt.Errorf("cannot reserve %q resource", k)
}
}
return rl, nil
return k, nil
}

View File

@ -19,6 +19,7 @@ package app
import (
"testing"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/util/config"
)
@ -56,7 +57,7 @@ func TestValueOfAllocatableResources(t *testing.T) {
kubeReservedCM.Set(test.kubeReserved)
systemReservedCM.Set(test.systemReserved)
_, err := parseReservation(kubeReservedCM, systemReservedCM)
_, err := kubelet.ParseReservation(kubeReservedCM, systemReservedCM)
if err != nil {
t.Logf("%s: error returned: %v", test.name, err)
}

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/componentconfig"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/fields"
@ -158,16 +159,16 @@ func (s *KubeletExecutorServer) runKubelet(
}
}()
kcfg, err := kubeletapp.UnsecuredKubeletConfig(s.KubeletServer)
kubeDeps, err := kubeletapp.UnsecuredKubeletDeps(s.KubeletServer)
if err != nil {
return err
}
// apply Mesos specific settings
kcfg.Builder = func(kc *kubeletapp.KubeletConfig) (kubeletapp.KubeletBootstrap, *kconfig.PodConfig, error) {
k, pc, err := kubeletapp.CreateAndInitKubelet(kc)
kubeDeps.Builder = func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool) (kubelet.KubeletBootstrap, error) {
k, err := kubeletapp.CreateAndInitKubelet(kubeCfg, kubeDeps, standaloneMode)
if err != nil {
return k, pc, err
return k, err
}
// decorate kubelet such that it shuts down when the executor is
@ -177,11 +178,10 @@ func (s *KubeletExecutorServer) runKubelet(
executorDone: executorDone,
}
return decorated, pc, nil
return decorated, nil
}
kcfg.RuntimeCgroups = "" // don't move the docker daemon into a cgroup
kcfg.Hostname = kcfg.HostnameOverride
kcfg.KubeClient = apiclient
s.RuntimeCgroups = "" // don't move the docker daemon into a cgroup
kubeDeps.KubeClient = apiclient
// taken from KubeletServer#Run(*KubeletConfig)
eventClientConfig, err := kubeletapp.CreateAPIServerClientConfig(s.KubeletServer)
@ -192,16 +192,16 @@ func (s *KubeletExecutorServer) runKubelet(
// make a separate client for events
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
kcfg.EventClient, err = clientset.NewForConfig(eventClientConfig)
kubeDeps.EventClient, err = clientset.NewForConfig(eventClientConfig)
if err != nil {
return err
}
kcfg.NodeName = kcfg.HostnameOverride
kcfg.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kcfg.Recorder) // override the default pod source
kcfg.StandaloneMode = false
kcfg.SystemCgroups = "" // don't take control over other system processes.
if kcfg.Cloud != nil {
kubeDeps.PodConfig = kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kubeDeps.Recorder) // override the default pod source
s.SystemCgroups = "" // don't take control over other system processes.
if kubeDeps.Cloud != nil {
// fail early and hard because having the cloud provider loaded would go unnoticed,
// but break bigger cluster because accessing the state.json from every slave kills the master.
panic("cloud provider must not be set")
@ -209,17 +209,17 @@ func (s *KubeletExecutorServer) runKubelet(
// create custom cAdvisor interface which return the resource values that Mesos reports
ni := <-nodeInfos
cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, uint(s.CAdvisorPort), kcfg.ContainerRuntime)
cAdvisorInterface, err := NewMesosCadvisor(ni.Cores, ni.Mem, uint(s.CAdvisorPort), s.ContainerRuntime)
if err != nil {
return err
}
kcfg.CAdvisorInterface = cAdvisorInterface
kcfg.ContainerManager, err = cm.NewContainerManager(kcfg.Mounter, cAdvisorInterface, cm.NodeConfig{
RuntimeCgroupsName: kcfg.RuntimeCgroups,
SystemCgroupsName: kcfg.SystemCgroups,
KubeletCgroupsName: kcfg.KubeletCgroups,
ContainerRuntime: kcfg.ContainerRuntime,
kubeDeps.CAdvisorInterface = cAdvisorInterface
kubeDeps.ContainerManager, err = cm.NewContainerManager(kubeDeps.Mounter, cAdvisorInterface, cm.NodeConfig{
RuntimeCgroupsName: s.RuntimeCgroups,
SystemCgroupsName: s.SystemCgroups,
KubeletCgroupsName: s.KubeletCgroups,
ContainerRuntime: s.ContainerRuntime,
})
if err != nil {
return err
@ -239,26 +239,26 @@ func (s *KubeletExecutorServer) runKubelet(
containerOptions = append(containerOptions, podsource.ContainerEnvOverlay([]api.EnvVar{
{Name: envContainerID, Value: s.containerID},
}))
kcfg.ContainerRuntimeOptions = append(kcfg.ContainerRuntimeOptions,
kubeDeps.ContainerRuntimeOptions = append(kubeDeps.ContainerRuntimeOptions,
dockertools.PodInfraContainerEnv(map[string]string{
envContainerID: s.containerID,
}))
}
podsource.Mesos(executorDone, kcfg.PodConfig.Channel(podsource.MesosSource), podLW, registry, containerOptions...)
podsource.Mesos(executorDone, kubeDeps.PodConfig.Channel(podsource.MesosSource), podLW, registry, containerOptions...)
// create static-pods directory file source
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
fileSourceUpdates := kcfg.PodConfig.Channel(kubetypes.FileSource)
kconfig.NewSourceFile(staticPodsConfigPath, kcfg.HostnameOverride, kcfg.FileCheckFrequency, fileSourceUpdates)
fileSourceUpdates := kubeDeps.PodConfig.Channel(kubetypes.FileSource)
kconfig.NewSourceFile(staticPodsConfigPath, s.HostnameOverride, s.FileCheckFrequency.Duration, fileSourceUpdates)
// run the kubelet
// NOTE: because kcfg != nil holds, the upstream Run function will not
// NOTE: because kubeDeps != nil holds, the upstream Run function will not
// initialize the cloud provider. We explicitly wouldn't want
// that because then every kubelet instance would query the master
// state.json which does not scale.
s.KubeletServer.LockFilePath = "" // disable lock file
err = kubeletapp.Run(s.KubeletServer, kcfg)
err = kubeletapp.Run(s.KubeletServer, kubeDeps)
return
}

View File

@ -36,9 +36,11 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
utilpod "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/apis/componentconfig"
kubeExternal "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
@ -78,6 +80,7 @@ import (
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/bandwidth"
"k8s.io/kubernetes/pkg/util/clock"
utilconfig "k8s.io/kubernetes/pkg/util/config"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/flowcontrol"
@ -85,6 +88,7 @@ import (
kubeio "k8s.io/kubernetes/pkg/util/io"
utilipt "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/kubernetes/pkg/util/mount"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -173,90 +177,198 @@ type SyncHandler interface {
// Option is a functional option type for Kubelet
type Option func(*Kubelet)
// bootstrapping interface for kubelet, targets the initialization protocol
type KubeletBootstrap interface {
GetConfiguration() *componentconfig.KubeletConfiguration
BirthCry()
StartGarbageCollection()
ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool)
ListenAndServeReadOnly(address net.IP, port uint)
Run(<-chan kubetypes.PodUpdate)
RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
}
// create and initialize a Kubelet instance
type KubeletBuilder func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (KubeletBootstrap, error)
// KubeletDeps is a bin for things we might consider "injected dependencies" -- objects constructed
// at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
// these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
type KubeletDeps struct {
// TODO(mtaufen): KubeletBuilder:
// Mesos currently uses this as a hook to let them make their own call to
// let them wrap the KubeletBootstrap that CreateAndInitKubelet returns with
// their own KubeletBootstrap. It's a useful hook. I need to think about what
// a nice home for it would be. There seems to be a trend, between this and
// the Options fields below, of providing hooks where you can add extra functionality
// to the Kubelet for your solution. Maybe we should centralize these sorts of things?
Builder KubeletBuilder
// TODO(mtaufen): ContainerRuntimeOptions and Options:
// Arrays of functions that can do arbitrary things to the Kubelet and the Runtime
// seem like a difficult path to trace when it's time to debug something.
// I'm leaving these fields here for now, but there is likely an easier-to-follow
// way to support their intended use cases. E.g. ContainerRuntimeOptions
// is used by Mesos to set an environment variable in containers which has
// some connection to their container GC. It seems that Mesos intends to use
// Options to add additional node conditions that are updated as part of the
// Kubelet lifecycle (see https://github.com/kubernetes/kubernetes/pull/21521).
// We should think about providing more explicit ways of doing these things.
ContainerRuntimeOptions []kubecontainer.Option
Options []Option
// Injected Dependencies
Auth server.AuthInterface
CAdvisorInterface cadvisor.Interface
Cloud cloudprovider.Interface
ContainerManager cm.ContainerManager
DockerClient dockertools.DockerInterface
EventClient *clientset.Clientset
KubeClient *clientset.Clientset
Mounter mount.Interface
NetworkPlugins []network.NetworkPlugin
OOMAdjuster *oom.OOMAdjuster
OSInterface kubecontainer.OSInterface
PodConfig *config.PodConfig
Recorder record.EventRecorder
Writer kubeio.Writer
VolumePlugins []volume.VolumePlugin
TLSOptions *server.TLSOptions
}
func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, nodeName string) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if kubeCfg.ManifestURLHeader != "" {
pieces := strings.Split(kubeCfg.ManifestURLHeader, ":")
if len(pieces) != 2 {
return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", kubeCfg.ManifestURLHeader)
}
manifestURLHeader.Set(pieces[0], pieces[1])
}
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// define file config source
if kubeCfg.PodManifestPath != "" {
glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath)
config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
// define url config source
if kubeCfg.ManifestURL != "" {
glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader)
config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
if kubeDeps.KubeClient != nil {
glog.Infof("Watching apiserver")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
}
return cfg, nil
}
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(
hostname string,
nodeName string,
dockerClient dockertools.DockerInterface,
kubeClient clientset.Interface,
rootDirectory string,
seccompProfileRoot string,
podInfraContainerImage string,
resyncInterval time.Duration,
pullQPS float32,
pullBurst int,
eventQPS float32,
eventBurst int,
containerGCPolicy kubecontainer.ContainerGCPolicy,
sourcesReadyFn config.SourcesReadyFn,
registerNode bool,
registerSchedulable bool,
standaloneMode bool,
clusterDomain string,
clusterDNS net.IP,
masterServiceNamespace string,
volumePlugins []volume.VolumePlugin,
networkPlugins []network.NetworkPlugin,
networkPluginName string,
networkPluginMTU int,
streamingConnectionIdleTimeout time.Duration,
recorder record.EventRecorder,
cadvisorInterface cadvisor.Interface,
imageGCPolicy images.ImageGCPolicy,
diskSpacePolicy DiskSpacePolicy,
cloud cloudprovider.Interface,
autoDetectCloudProvider bool,
nodeLabels map[string]string,
nodeStatusUpdateFrequency time.Duration,
osInterface kubecontainer.OSInterface,
CgroupsPerQOS bool,
cgroupRoot string,
containerRuntime string,
remoteRuntimeEndpoint string,
remoteImageEndpoint string,
runtimeRequestTimeout time.Duration,
rktPath string,
rktAPIEndpoint string,
rktStage1Image string,
mounter mount.Interface,
writer kubeio.Writer,
configureCBR0 bool,
nonMasqueradeCIDR string,
podCIDR string,
reconcileCIDR bool,
maxPods int,
podsPerCore int,
nvidiaGPUs int,
dockerExecHandler dockertools.ExecHandler,
resolverConfig string,
cpuCFSQuota bool,
daemonEndpoints *api.NodeDaemonEndpoints,
oomAdjuster *oom.OOMAdjuster,
serializeImagePulls bool,
containerManager cm.ContainerManager,
outOfDiskTransitionFrequency time.Duration,
flannelExperimentalOverlay bool,
nodeIP net.IP,
reservation kubetypes.Reservation,
enableCustomMetrics bool,
volumeStatsAggPeriod time.Duration,
containerRuntimeOptions []kubecontainer.Option,
hairpinMode string,
babysitDaemons bool,
evictionConfig eviction.Config,
kubeOptions []Option,
enableControllerAttachDetach bool,
makeIPTablesUtilChains bool,
iptablesMasqueradeBit int,
iptablesDropBit int,
allowedUnsafeSysctls []string,
) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) {
if kubeCfg.RootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", kubeCfg.RootDirectory)
}
if resyncInterval <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval)
if kubeCfg.SyncFrequency.Duration <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
}
if kubeCfg.MakeIPTablesUtilChains {
if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 {
return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
}
if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 {
return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
}
if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit {
return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
}
}
hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride)
// Query the cloud provider for our node name, default to hostname
nodeName := hostname
if kubeDeps.Cloud != nil {
var err error
instances, ok := kubeDeps.Cloud.Instances()
if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
nodeName, err = instances.CurrentNodeName(hostname)
if err != nil {
return nil, fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
}
glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
}
// TODO: KubeletDeps.KubeClient should be a client interface, but client interface misses certain methods
// used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing
// a nil pointer to it when what we really want is a nil interface.
var kubeClient clientset.Interface
if kubeDeps.KubeClient != nil {
kubeClient = kubeDeps.KubeClient
// TODO: remove this when we've refactored kubelet to only use clientset.
}
if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
if err != nil {
return nil, err
}
}
containerGCPolicy := kubecontainer.ContainerGCPolicy{
MinAge: kubeCfg.MinimumGCAge.Duration,
MaxPerPodContainer: int(kubeCfg.MaxPerPodContainerCount),
MaxContainers: int(kubeCfg.MaxContainerCount),
}
daemonEndpoints := &api.NodeDaemonEndpoints{
KubeletEndpoint: api.DaemonEndpoint{Port: kubeCfg.Port},
}
imageGCPolicy := images.ImageGCPolicy{
MinAge: kubeCfg.ImageMinimumGCAge.Duration,
HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
}
diskSpacePolicy := DiskSpacePolicy{
DockerFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB),
RootFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB),
}
thresholds, err := eviction.ParseThresholdConfig(kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
if err != nil {
return nil, err
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
}
reservation, err := ParseReservation(kubeCfg.KubeReserved, kubeCfg.SystemReserved)
if err != nil {
return nil, err
}
var dockerExecHandler dockertools.ExecHandler
switch kubeCfg.DockerExecHandlerName {
case "native":
dockerExecHandler = &dockertools.NativeExecHandler{}
case "nsenter":
dockerExecHandler = &dockertools.NsenterExecHandler{}
default:
glog.Warningf("Unknown Docker exec handler %q; defaulting to native", kubeCfg.DockerExecHandlerName)
dockerExecHandler = &dockertools.NativeExecHandler{}
}
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
@ -305,79 +417,81 @@ func NewMainKubelet(
Namespace: "",
}
diskSpaceManager, err := newDiskSpaceManager(cadvisorInterface, diskSpacePolicy)
diskSpaceManager, err := newDiskSpaceManager(kubeDeps.CAdvisorInterface, diskSpacePolicy)
if err != nil {
return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
}
containerRefManager := kubecontainer.NewRefManager()
oomWatcher := NewOOMWatcher(cadvisorInterface, recorder)
oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)
// TODO: remove when internal cbr0 implementation gets removed in favor
// of the kubenet network plugin
if networkPluginName == "kubenet" {
configureCBR0 = false
flannelExperimentalOverlay = false
// TODO(mtaufen): remove when internal cbr0 implementation gets removed in favor
// of the kubenet network plugin
var myConfigureCBR0 bool = kubeCfg.ConfigureCBR0
var myFlannelExperimentalOverlay bool = kubeCfg.ExperimentalFlannelOverlay
if kubeCfg.NetworkPluginName == "kubenet" {
myConfigureCBR0 = false
myFlannelExperimentalOverlay = false
}
klet := &Kubelet{
hostname: hostname,
nodeName: nodeName,
dockerClient: dockerClient,
dockerClient: kubeDeps.DockerClient,
kubeClient: kubeClient,
rootDirectory: rootDirectory,
resyncInterval: resyncInterval,
rootDirectory: kubeCfg.RootDirectory,
resyncInterval: kubeCfg.SyncFrequency.Duration,
containerRefManager: containerRefManager,
httpClient: &http.Client{},
sourcesReady: config.NewSourcesReady(sourcesReadyFn),
registerNode: registerNode,
registerSchedulable: registerSchedulable,
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
registerNode: kubeCfg.RegisterNode,
registerSchedulable: kubeCfg.RegisterSchedulable,
standaloneMode: standaloneMode,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
clusterDomain: kubeCfg.ClusterDomain,
clusterDNS: net.ParseIP(kubeCfg.ClusterDNS),
serviceLister: serviceLister,
nodeLister: nodeLister,
nodeInfo: nodeInfo,
masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder,
cadvisor: cadvisorInterface,
masterServiceNamespace: kubeCfg.MasterServiceNamespace,
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
recorder: kubeDeps.Recorder,
cadvisor: kubeDeps.CAdvisorInterface,
diskSpaceManager: diskSpaceManager,
cloud: cloud,
autoDetectCloudProvider: autoDetectCloudProvider,
cloud: kubeDeps.Cloud,
autoDetectCloudProvider: (kubeExternal.AutoDetectCloudProvider == kubeCfg.CloudProvider),
nodeRef: nodeRef,
nodeLabels: nodeLabels,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
os: osInterface,
nodeLabels: kubeCfg.NodeLabels,
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
os: kubeDeps.OSInterface,
oomWatcher: oomWatcher,
CgroupsPerQOS: CgroupsPerQOS,
cgroupRoot: cgroupRoot,
mounter: mounter,
writer: writer,
configureCBR0: configureCBR0,
nonMasqueradeCIDR: nonMasqueradeCIDR,
reconcileCIDR: reconcileCIDR,
maxPods: maxPods,
podsPerCore: podsPerCore,
nvidiaGPUs: nvidiaGPUs,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
cgroupRoot: kubeCfg.CgroupRoot,
mounter: kubeDeps.Mounter,
writer: kubeDeps.Writer,
configureCBR0: myConfigureCBR0,
nonMasqueradeCIDR: kubeCfg.NonMasqueradeCIDR,
reconcileCIDR: kubeCfg.ReconcileCIDR,
maxPods: int(kubeCfg.MaxPods),
podsPerCore: int(kubeCfg.PodsPerCore),
nvidiaGPUs: int(kubeCfg.NvidiaGPUs),
syncLoopMonitor: atomic.Value{},
resolverConfig: resolverConfig,
cpuCFSQuota: cpuCFSQuota,
resolverConfig: kubeCfg.ResolverConfig,
cpuCFSQuota: kubeCfg.CPUCFSQuota,
daemonEndpoints: daemonEndpoints,
containerManager: containerManager,
flannelExperimentalOverlay: flannelExperimentalOverlay,
containerManager: kubeDeps.ContainerManager,
flannelExperimentalOverlay: myFlannelExperimentalOverlay,
flannelHelper: nil,
nodeIP: nodeIP,
nodeIP: net.ParseIP(kubeCfg.NodeIP),
clock: clock.RealClock{},
outOfDiskTransitionFrequency: outOfDiskTransitionFrequency,
reservation: reservation,
enableCustomMetrics: enableCustomMetrics,
babysitDaemons: babysitDaemons,
enableControllerAttachDetach: enableControllerAttachDetach,
outOfDiskTransitionFrequency: kubeCfg.OutOfDiskTransitionFrequency.Duration,
reservation: *reservation,
enableCustomMetrics: kubeCfg.EnableCustomMetrics,
babysitDaemons: kubeCfg.BabysitDaemons,
enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
iptClient: utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4),
makeIPTablesUtilChains: makeIPTablesUtilChains,
iptablesMasqueradeBit: iptablesMasqueradeBit,
iptablesDropBit: iptablesDropBit,
makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
iptablesDropBit: int(kubeCfg.IPTablesDropBit),
}
if klet.flannelExperimentalOverlay {
@ -391,7 +505,7 @@ func NewMainKubelet(
glog.Infof("Using node IP: %q", klet.nodeIP.String())
}
if mode, err := effectiveHairpinMode(componentconfig.HairpinMode(hairpinMode), containerRuntime, configureCBR0, networkPluginName); err != nil {
if mode, err := effectiveHairpinMode(componentconfig.HairpinMode(kubeCfg.HairpinMode), kubeCfg.ContainerRuntime, kubeCfg.ConfigureCBR0, kubeCfg.NetworkPluginName); err != nil {
// This is a non-recoverable error. Returning it up the callstack will just
// lead to retries of the same failure, so just fail hard.
glog.Fatalf("Invalid hairpin mode: %v", err)
@ -400,7 +514,7 @@ func NewMainKubelet(
}
glog.Infof("Hairpin mode set to %q", klet.hairpinMode)
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}, klet.hairpinMode, klet.nonMasqueradeCIDR, networkPluginMTU); err != nil {
if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &networkHost{klet}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil {
return nil, err
} else {
klet.networkPlugin = plug
@ -419,63 +533,63 @@ func NewMainKubelet(
klet.podCache = kubecontainer.NewCache()
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
if remoteRuntimeEndpoint != "" {
containerRuntime = "remote"
if kubeCfg.RemoteRuntimeEndpoint != "" {
kubeCfg.ContainerRuntime = "remote"
// remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified
if remoteImageEndpoint == "" {
remoteImageEndpoint = remoteRuntimeEndpoint
// kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified
if kubeCfg.RemoteImageEndpoint == "" {
kubeCfg.RemoteImageEndpoint = kubeCfg.RemoteRuntimeEndpoint
}
}
// Initialize the runtime.
switch containerRuntime {
switch kubeCfg.ContainerRuntime {
case "docker":
// Only supported one for now, continue.
klet.containerRuntime = dockertools.NewDockerManager(
dockerClient,
kubecontainer.FilterEventRecorder(recorder),
kubeDeps.DockerClient,
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
containerRefManager,
klet.podManager,
machineInfo,
podInfraContainerImage,
pullQPS,
pullBurst,
kubeCfg.PodInfraContainerImage,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
containerLogsDir,
osInterface,
kubeDeps.OSInterface,
klet.networkPlugin,
klet,
klet.httpClient,
dockerExecHandler,
oomAdjuster,
kubeDeps.OOMAdjuster,
procFs,
klet.cpuCFSQuota,
imageBackOff,
serializeImagePulls,
enableCustomMetrics,
kubeCfg.SerializeImagePulls,
kubeCfg.EnableCustomMetrics,
// If using "kubenet", the Kubernetes network plugin that wraps
// CNI's bridge plugin, it knows how to set the hairpin veth flag
// so we tell the container runtime to back away from setting it.
// If the kubelet is started with any other plugin we can't be
// sure it handles the hairpin case so we instruct the docker
// runtime to set the flag instead.
klet.hairpinMode == componentconfig.HairpinVeth && networkPluginName != "kubenet",
seccompProfileRoot,
containerRuntimeOptions...,
klet.hairpinMode == componentconfig.HairpinVeth && kubeCfg.NetworkPluginName != "kubenet",
kubeCfg.SeccompProfileRoot,
kubeDeps.ContainerRuntimeOptions...,
)
case "rkt":
// TODO: Include hairpin mode settings in rkt?
conf := &rkt.Config{
Path: rktPath,
Stage1Image: rktStage1Image,
Path: kubeCfg.RktPath,
Stage1Image: kubeCfg.RktStage1Image,
InsecureOptions: "image,ondisk",
}
rktRuntime, err := rkt.New(
rktAPIEndpoint,
kubeCfg.RktAPIEndpoint,
conf,
klet,
recorder,
kubeDeps.Recorder,
containerRefManager,
klet.podManager,
klet.livenessManager,
@ -485,32 +599,32 @@ func NewMainKubelet(
utilexec.New(),
kubecontainer.RealOS{},
imageBackOff,
serializeImagePulls,
runtimeRequestTimeout,
kubeCfg.SerializeImagePulls,
kubeCfg.RuntimeRequestTimeout.Duration,
)
if err != nil {
return nil, err
}
klet.containerRuntime = rktRuntime
case "remote":
remoteRuntimeService, err := remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, runtimeRequestTimeout)
remoteRuntimeService, err := remote.NewRemoteRuntimeService(kubeCfg.RemoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, err
}
remoteImageService, err := remote.NewRemoteImageService(remoteImageEndpoint, runtimeRequestTimeout)
remoteImageService, err := remote.NewRemoteImageService(kubeCfg.RemoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, err
}
klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(recorder),
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
containerRefManager,
osInterface,
kubeDeps.OSInterface,
klet.networkPlugin,
klet,
klet.httpClient,
imageBackOff,
serializeImagePulls,
kubeCfg.SerializeImagePulls,
klet.cpuCFSQuota,
remoteRuntimeService,
remoteImageService,
@ -519,15 +633,15 @@ func NewMainKubelet(
return nil, err
}
default:
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime)
}
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, volumeStatsAggPeriod, klet.containerRuntime)
klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, klet.containerRuntime)
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.updatePodCIDR(podCIDR)
klet.updatePodCIDR(kubeCfg.PodCIDR)
// setup containerGC
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
@ -538,7 +652,7 @@ func NewMainKubelet(
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
// setup imageManager
imageManager, err := images.NewImageGCManager(klet.containerRuntime, cadvisorInterface, recorder, nodeRef, imageGCPolicy)
imageManager, err := images.NewImageGCManager(klet.containerRuntime, kubeDeps.CAdvisorInterface, kubeDeps.Recorder, nodeRef, imageGCPolicy)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
@ -552,25 +666,25 @@ func NewMainKubelet(
klet.livenessManager,
klet.runner,
containerRefManager,
recorder)
kubeDeps.Recorder)
klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, volumePlugins)
NewInitializedVolumePluginMgr(klet, kubeDeps.VolumePlugins)
if err != nil {
return nil, err
}
// setup volumeManager
klet.volumeManager, err = volumemanager.NewVolumeManager(
enableControllerAttachDetach,
kubeCfg.EnableControllerAttachDetach,
nodeName,
klet.podManager,
klet.kubeClient,
klet.volumePluginMgr,
klet.containerRuntime,
mounter,
kubeDeps.Mounter,
klet.getPodsDir(),
recorder)
kubeDeps.Recorder)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
@ -579,14 +693,15 @@ func NewMainKubelet(
klet.runtimeCache = runtimeCache
klet.reasonCache = NewReasonCache()
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
klet.podWorkers = newPodWorkers(klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
// setup eviction manager
evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers), klet.imageManager, recorder, nodeRef, klet.clock)
evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers), klet.imageManager, kubeDeps.Recorder, nodeRef, klet.clock)
if err != nil {
return nil, fmt.Errorf("failed to initialize eviction manager: %v", err)
}
@ -604,7 +719,7 @@ func NewMainKubelet(
}
// Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec
// Hence, we concatenate those two lists.
safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), allowedUnsafeSysctls...)
safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), kubeCfg.AllowedUnsafeSysctls...)
unsafeWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls, api.UnsafeSysctlsPodAnnotationKey)
if err != nil {
return nil, err
@ -614,19 +729,23 @@ func NewMainKubelet(
klet.AddPodAdmitHandler(unsafeWhitelist)
// enable active deadline handler
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, klet.recorder, klet.clock)
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
if err != nil {
return nil, err
}
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)
klet.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(containerRuntime))
klet.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(kubeCfg.ContainerRuntime))
// apply functional Option's
for _, opt := range kubeOptions {
for _, opt := range kubeDeps.Options {
opt(klet)
}
// Finally, put the most recent version of the config on the Kubelet, so
// people can see how it was configured.
klet.kubeletConfiguration = kubeCfg
return klet, nil
}
@ -640,6 +759,8 @@ type nodeLister interface {
// Kubelet is the main kubelet implementation.
type Kubelet struct {
kubeletConfiguration *componentconfig.KubeletConfiguration
hostname string
nodeName string
dockerClient dockertools.DockerInterface
@ -792,7 +913,7 @@ type Kubelet struct {
resourceAnalyzer stats.ResourceAnalyzer
// Whether or not we should have the QOS cgroup hierarchy for resource management
CgroupsPerQOS bool
cgroupsPerQOS bool
// If non-empty, pass this to the container runtime as the root cgroup.
cgroupRoot string
@ -2913,6 +3034,11 @@ func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16
return kl.runner.PortForward(&pod, port, stream)
}
// GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
func (kl *Kubelet) GetConfiguration() *componentconfig.KubeletConfiguration {
return kl.kubeletConfiguration
}
// BirthCry sends an event that the kubelet has started up.
func (kl *Kubelet) BirthCry() {
// Make an event that kubelet restarted.
@ -2956,3 +3082,39 @@ func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
// ContatnerRemoved doesn't affect pod state
return event.Type != pleg.ContainerRemoved
}
func parseResourceList(m utilconfig.ConfigurationMap) (api.ResourceList, error) {
rl := make(api.ResourceList)
for k, v := range m {
switch api.ResourceName(k) {
// Only CPU and memory resources are supported.
case api.ResourceCPU, api.ResourceMemory:
q, err := resource.ParseQuantity(v)
if err != nil {
return nil, err
}
if q.Sign() == -1 {
return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
}
rl[api.ResourceName(k)] = q
default:
return nil, fmt.Errorf("cannot reserve %q resource", k)
}
}
return rl, nil
}
func ParseReservation(kubeReserved, systemReserved utilconfig.ConfigurationMap) (*kubetypes.Reservation, error) {
reservation := new(kubetypes.Reservation)
if rl, err := parseResourceList(kubeReserved); err != nil {
return nil, err
} else {
reservation.Kubernetes = rl
}
if rl, err := parseResourceList(systemReserved); err != nil {
return nil, err
} else {
reservation.System = rl
}
return reservation, nil
}

View File

@ -21,11 +21,18 @@ import (
kubeletapp "k8s.io/kubernetes/cmd/kubelet/app"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/componentconfig"
"k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeio "k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/volume/empty_dir"
"k8s.io/kubernetes/test/utils"
@ -33,7 +40,8 @@ import (
)
type HollowKubelet struct {
KubeletConfig *kubeletapp.KubeletConfig
KubeletConfiguration *componentconfig.KubeletConfiguration
KubeletDeps *kubelet.KubeletDeps
}
func NewHollowKubelet(
@ -45,44 +53,116 @@ func NewHollowKubelet(
containerManager cm.ContainerManager,
maxPods int, podsPerCore int,
) *HollowKubelet {
testRootDir := utils.MakeTempDirOrDie("hollow-kubelet.", "")
manifestFilePath := utils.MakeTempDirOrDie("manifest", testRootDir)
glog.Infof("Using %s as root dir for hollow-kubelet", testRootDir)
// -----------------
// Static config
// -----------------
c := GetHollowKubeletConfig(nodeName, kubeletPort, kubeletReadOnlyPort, maxPods, podsPerCore)
// -----------------
// Injected objects
// -----------------
d := &kubelet.KubeletDeps{
KubeClient: client,
DockerClient: dockerClient,
CAdvisorInterface: cadvisorInterface,
Cloud: nil,
OSInterface: &containertest.FakeOS{},
ContainerManager: containerManager,
VolumePlugins: empty_dir.ProbeVolumePlugins(),
TLSOptions: nil,
OOMAdjuster: oom.NewFakeOOMAdjuster(),
Writer: &kubeio.StdWriter{},
Mounter: mount.New(),
}
return &HollowKubelet{
KubeletConfig: kubeletapp.SimpleKubelet(
client,
dockerClient,
nodeName,
testRootDir,
"", /* manifest-url */
"0.0.0.0", /* bind address */
uint(kubeletPort),
uint(kubeletReadOnlyPort),
api.NamespaceDefault,
empty_dir.ProbeVolumePlugins(),
nil, /* tls-options */
cadvisorInterface,
manifestFilePath,
nil, /* cloud-provider */
&containertest.FakeOS{}, /* os-interface */
20*time.Second, /* FileCheckFrequency */
20*time.Second, /* HTTPCheckFrequency */
1*time.Minute, /* MinimumGCAge */
10*time.Second, /* NodeStatusUpdateFrequency */
10*time.Second, /* SyncFrequency */
5*time.Minute, /* OutOfDiskTransitionFrequency */
5*time.Minute, /* EvictionPressureTransitionPeriod */
maxPods,
podsPerCore,
containerManager,
nil,
),
KubeletConfiguration: c,
KubeletDeps: d,
}
}
// Starts this HollowKubelet and blocks.
func (hk *HollowKubelet) Run() {
kubeletapp.RunKubelet(hk.KubeletConfig)
kubeletapp.RunKubelet(hk.KubeletConfiguration, hk.KubeletDeps, false, false)
select {}
}
// Builds a KubeletConfiguration for the HollowKubelet, ensuring that the
// usual defaults are applied for fields we do not override.
func GetHollowKubeletConfig(
nodeName string,
kubeletPort int,
kubeletReadOnlyPort int,
maxPods int,
podsPerCore int) *componentconfig.KubeletConfiguration {
testRootDir := utils.MakeTempDirOrDie("hollow-kubelet.", "")
manifestFilePath := utils.MakeTempDirOrDie("manifest", testRootDir)
glog.Infof("Using %s as root dir for hollow-kubelet", testRootDir)
// Do the external -> internal conversion to make sure that defaults
// are set for fields not overridden in NewHollowKubelet.
tmp := &v1alpha1.KubeletConfiguration{}
c := &componentconfig.KubeletConfiguration{}
api.Scheme.Convert(tmp, c, nil)
c.HostnameOverride = nodeName
c.RootDirectory = testRootDir
c.ManifestURL = ""
c.Address = "0.0.0.0" /* bind address */
c.Port = int32(kubeletPort)
c.ReadOnlyPort = int32(kubeletReadOnlyPort)
c.MasterServiceNamespace = api.NamespaceDefault
c.PodManifestPath = manifestFilePath
c.FileCheckFrequency.Duration = 20 * time.Second
c.HTTPCheckFrequency.Duration = 20 * time.Second
c.MinimumGCAge.Duration = 1 * time.Minute
c.NodeStatusUpdateFrequency.Duration = 10 * time.Second
c.SyncFrequency.Duration = 10 * time.Second
c.OutOfDiskTransitionFrequency.Duration = 5 * time.Minute
c.EvictionPressureTransitionPeriod.Duration = 5 * time.Minute
c.MaxPods = int32(maxPods)
c.PodsPerCore = int32(podsPerCore)
c.ClusterDNS = ""
c.DockerExecHandlerName = "native"
c.ImageGCHighThresholdPercent = 90
c.ImageGCLowThresholdPercent = 80
c.LowDiskSpaceThresholdMB = 256
c.VolumeStatsAggPeriod.Duration = time.Minute
c.CgroupRoot = ""
c.ContainerRuntime = "docker"
c.CPUCFSQuota = true
c.RuntimeCgroups = ""
c.EnableControllerAttachDetach = false
c.EnableCustomMetrics = false
c.EnableDebuggingHandlers = true
c.EnableServer = true
c.CgroupsPerQOS = false
// Since this kubelet runs with --configure-cbr0=false, it needs to use
// hairpin-veth to allow hairpin packets. Note that this deviates from
// what the "real" kubelet currently does, because there's no way to
// set promiscuous mode on docker0.
c.HairpinMode = componentconfig.HairpinVeth
c.MaxContainerCount = 100
c.MaxOpenFiles = 1024
c.MaxPerPodContainerCount = 2
c.NvidiaGPUs = 0
c.RegisterNode = true
c.RegisterSchedulable = true
c.RegistryBurst = 10
c.RegistryPullQPS = 5.0
c.ResolverConfig = kubetypes.ResolvConfDefault
c.KubeletCgroups = "/kubelet"
c.SerializeImagePulls = true
c.SystemCgroups = ""
c.ProtectKernelDefaults = false
// TODO(mtaufen): Note that PodInfraContainerImage was being set to the empty value before,
// but this may not have been intentional. (previous code (SimpleKubelet)
// was peeling it off of a componentconfig.KubeletConfiguration{}, but may
// have actually wanted the default).
// The default will be present in the KubeletConfiguration contstructed above.
return c
}