Re-use KubeletServer.KubeletConfig and KubeletServer.Run

pull/6/head
Dr. Stefan Schimanski 2015-09-23 09:38:00 +02:00
parent dd5bafdba5
commit 0feb1bceb5
1 changed files with 39 additions and 222 deletions

View File

@ -19,11 +19,8 @@ package service
import (
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
@ -37,20 +34,13 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kconfig "k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubeletContainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util"
utilio "k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/rand"
)
const (
@ -90,18 +80,8 @@ func (s *KubeletExecutorServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.LaunchGracePeriod, "mesos-launch-grace-period", s.LaunchGracePeriod, "Launch grace period after which launching tasks will be cancelled. Zero disables launch cancellation.")
}
func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{}, staticPodsConfigPath string) error {
// create apiserver client
var apiclient *client.Client
clientConfig, err := s.CreateAPIServerClientConfig()
if err == nil {
apiclient, err = client.New(clientConfig)
}
if err != nil {
// required for k8sm since we need to send api.Binding information
// back to the apiserver
log.Fatalf("No API client: %v", err)
}
func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpdate, kubeletFinished <-chan struct{},
staticPodsConfigPath string, apiclient *client.Client) error {
exec := executor.New(executor.Config{
Updates: execUpdates,
APIClient: apiclient,
@ -161,19 +141,40 @@ func (s *KubeletExecutorServer) runExecutor(execUpdates chan<- kubetypes.PodUpda
return nil
}
// Run runs the specified KubeletExecutorServer.
func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletFinished chan<- struct{}, staticPodsConfigPath string) error {
// empty string for the docker and system containers (= cgroup paths). This
// stops the kubelet taking any control over other system processes.
s.SystemContainer = ""
s.DockerDaemonContainer = ""
oomAdjuster := oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, s.OOMScoreAdj); err != nil {
log.Info(err)
func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdate, kubeletFinished chan<- struct{},
staticPodsConfigPath string, apiclient *client.Client) error {
kcfg, err := s.UnsecuredKubeletConfig()
if err == nil {
kcfg.Builder = func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) {
return s.createAndInitKubelet(kc, staticPodsConfigPath, execUpdates, kubeletFinished)
}
kcfg.DockerDaemonContainer = "" // don't move the docker daemon into a cgroup
kcfg.Hostname = kcfg.HostnameOverride
kcfg.KubeClient = apiclient
kcfg.NodeName = kcfg.HostnameOverride
kcfg.StandaloneMode = false
kcfg.SystemContainer = "" // don't take control over other system processes.
err = s.KubeletServer.Run(kcfg)
}
dockerClient := dockertools.ConnectToDockerOrDie(s.DockerEndpoint)
if err != nil {
close(kubeletFinished)
}
return err
}
// Run runs the specified KubeletExecutorServer.
func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
// create shared channels
kubeletFinished := make(chan struct{})
execUpdates := make(chan kubetypes.PodUpdate, 1)
// create static pods directory
staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods")
err := os.Mkdir(staticPodsConfigPath, 0755)
if err != nil {
return err
}
// create apiserver client
var apiclient *client.Client
@ -186,194 +187,14 @@ func (s *KubeletExecutorServer) runKubelet(execUpdates <-chan kubetypes.PodUpdat
log.Fatalf("No API client: %v", err)
}
log.Infof("Using root directory: %v", s.RootDirectory)
credentialprovider.SetPreferredDockercfgPath(s.RootDirectory)
cAdvisorInterface, err := cadvisor.New(s.CAdvisorPort)
if err != nil {
return err
}
imageGCPolicy := kubelet.ImageGCPolicy{
HighThresholdPercent: s.ImageGCHighThresholdPercent,
LowThresholdPercent: s.ImageGCLowThresholdPercent,
}
diskSpacePolicy := kubelet.DiskSpacePolicy{
DockerFreeDiskMB: s.LowDiskSpaceThresholdMB,
RootFreeDiskMB: s.LowDiskSpaceThresholdMB,
}
manifestURLHeader := make(http.Header)
if s.ManifestURLHeader != "" {
pieces := strings.Split(s.ManifestURLHeader, ":")
if len(pieces) != 2 {
return fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader)
}
manifestURLHeader.Set(pieces[0], pieces[1])
}
//TODO(jdef) intentionally NOT initializing a cloud provider here since:
//(a) the kubelet doesn't actually use it
//(b) we don't need to create N-kubelet connections to zookeeper for no good reason
//cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
//log.Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
hostNetworkSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostNetworkSources, ","))
if err != nil {
return err
}
hostPIDSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostPIDSources, ","))
if err != nil {
return err
}
hostIPCSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostIPCSources, ","))
if err != nil {
return err
}
tlsOptions, err := s.InitializeTLS()
if err != nil {
return err
}
mounter := mount.New()
if s.Containerized {
log.V(2).Info("Running kubelet in containerized mode (experimental)")
mounter = &mount.NsenterMounter{}
}
var writer utilio.Writer = &utilio.StdWriter{}
var dockerExecHandler dockertools.ExecHandler
switch s.DockerExecHandlerName {
case "native":
dockerExecHandler = &dockertools.NativeExecHandler{}
case "nsenter":
writer = &utilio.NsenterWriter{}
dockerExecHandler = &dockertools.NsenterExecHandler{}
default:
log.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName)
dockerExecHandler = &dockertools.NativeExecHandler{}
}
// prepare kubelet
kcfg := app.KubeletConfig{
Address: s.Address,
AllowPrivileged: s.AllowPrivileged,
CAdvisorInterface: cAdvisorInterface,
CgroupRoot: s.CgroupRoot,
Cloud: nil, // TODO(jdef) Cloud, specifying null here because we don't want all kubelets polling mesos-master; need to account for this in the cloudprovider impl
ClusterDNS: s.ClusterDNS,
ClusterDomain: s.ClusterDomain,
// ConfigFile: ""
ConfigureCBR0: s.ConfigureCBR0,
ContainerRuntime: s.ContainerRuntime,
CPUCFSQuota: s.CPUCFSQuota,
DiskSpacePolicy: diskSpacePolicy,
DockerClient: dockerClient,
DockerDaemonContainer: s.DockerDaemonContainer,
DockerExecHandler: dockerExecHandler,
EnableDebuggingHandlers: s.EnableDebuggingHandlers,
EnableServer: s.EnableServer,
EventBurst: s.EventBurst,
EventRecordQPS: s.EventRecordQPS,
FileCheckFrequency: s.FileCheckFrequency,
HostnameOverride: s.HostnameOverride,
HostNetworkSources: hostNetworkSources,
HostPIDSources: hostPIDSources,
HostIPCSources: hostIPCSources,
// HTTPCheckFrequency
ImageGCPolicy: imageGCPolicy,
KubeClient: apiclient,
// ManifestURL: ""
ManifestURLHeader: manifestURLHeader,
MasterServiceNamespace: s.MasterServiceNamespace,
MaxContainerCount: s.MaxContainerCount,
MaxOpenFiles: s.MaxOpenFiles,
MaxPerPodContainerCount: s.MaxPerPodContainerCount,
MaxPods: s.MaxPods,
MinimumGCAge: s.MinimumGCAge,
Mounter: mounter,
NetworkPluginName: s.NetworkPluginName,
NetworkPlugins: app.ProbeNetworkPlugins(s.NetworkPluginDir),
NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency,
OOMAdjuster: oomAdjuster,
OSInterface: kubecontainer.RealOS{},
PodCIDR: s.PodCIDR,
PodInfraContainerImage: s.PodInfraContainerImage,
Port: s.Port,
ReadOnlyPort: s.ReadOnlyPort,
RegisterNode: s.RegisterNode,
RegistryBurst: s.RegistryBurst,
RegistryPullQPS: s.RegistryPullQPS,
ResolverConfig: s.ResolverConfig,
ResourceContainer: s.ResourceContainer,
RootDirectory: s.RootDirectory,
Runonce: s.RunOnce,
// StandaloneMode: false
StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout,
SyncFrequency: s.SyncFrequency,
SystemContainer: s.SystemContainer,
TLSOptions: tlsOptions,
VolumePlugins: app.ProbeVolumePlugins(),
Writer: writer,
}
kcfg.NodeName = kcfg.Hostname
kcfg.Builder = app.KubeletBuilder(func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) {
return s.createAndInitKubelet(kc, clientConfig, staticPodsConfigPath, execUpdates, kubeletFinished)
})
err = app.RunKubelet(&kcfg)
if err != nil {
return err
}
// start health check server
if s.HealthzPort > 0 {
healthz.DefaultHealthz()
go util.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil)
if err != nil {
log.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second, util.NeverStop)
}
return nil
}
// Run runs the specified KubeletExecutorServer.
func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
rand.Seed(time.Now().UTC().UnixNano())
// create shared channels
kubeletFinished := make(chan struct{})
execUpdates := make(chan kubetypes.PodUpdate, 1)
// create static pods directory
staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods")
err := os.Mkdir(staticPodsConfigPath, 0755)
if err != nil {
return err
}
// start executor
err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath)
err = s.runExecutor(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient)
if err != nil {
return err
}
// start kubelet
err = s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath)
if err != nil {
close(kubeletFinished) // tell executor
return err
}
// block until executor is shut down or commits shutdown
select {}
// start kubelet, blocking
return s.runKubelet(execUpdates, kubeletFinished, staticPodsConfigPath, apiclient)
}
func defaultBindingAddress() string {
@ -387,12 +208,10 @@ func defaultBindingAddress() string {
func (ks *KubeletExecutorServer) createAndInitKubelet(
kc *app.KubeletConfig,
clientConfig *client.Config,
staticPodsConfigPath string,
execUpdates <-chan kubetypes.PodUpdate,
kubeletDone chan<- struct{},
) (app.KubeletBootstrap, *kconfig.PodConfig, error) {
// TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
// TODO(k8s): KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods
@ -405,7 +224,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
kubeClient = kc.KubeClient
}
gcPolicy := kubecontainer.ContainerGCPolicy{
gcPolicy := kubeletContainer.ContainerGCPolicy{
MinAge: kc.MinimumGCAge,
MaxPerPodContainer: kc.MaxPerPodContainerCount,
MaxContainers: kc.MaxContainerCount,
@ -495,7 +314,6 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
address: ks.Address,
dockerClient: kc.DockerClient,
kubeletDone: kubeletDone,
clientConfig: clientConfig,
executorDone: executorDone,
}
@ -512,7 +330,6 @@ type kubeletExecutor struct {
dockerClient dockertools.DockerInterface
kubeletDone chan<- struct{} // closed once kubelet.Run() returns
executorDone <-chan struct{} // closed when executor terminates
clientConfig *client.Config
}
func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions *kubelet.TLSOptions, auth kubelet.AuthInterface, enableDebuggingHandlers bool) {