Decouple executor initialization from kubelet

This patch reduces the dependencies of the executor from the kubelet. This
makes it possible launch the kubelet after the executor.

This considerably reduces the complexity of the startup code.

Moreover, this work is a requirement to use a standalone kubelet some day.
pull/6/head
Dr. Stefan Schimanski 2015-08-21 18:30:29 +02:00
parent 93ae257af4
commit a60df400fd
4 changed files with 176 additions and 154 deletions

View File

@ -95,14 +95,14 @@ type podStatusFunc func() (*api.PodStatus, error)
// KubernetesExecutor is an mesos executor that runs pods // KubernetesExecutor is an mesos executor that runs pods
// in a minion machine. // in a minion machine.
type KubernetesExecutor struct { type KubernetesExecutor struct {
updateChan chan<- interface{} // to send pod config updates to the kubelet updateChan chan<- kubetypes.PodUpdate // sent to the kubelet, closed on shutdown
state stateType state stateType
tasks map[string]*kuberTask tasks map[string]*kuberTask
pods map[string]*api.Pod pods map[string]*api.Pod
lock sync.Mutex lock sync.Mutex
sourcename string
client *client.Client client *client.Client
done chan struct{} // signals shutdown terminate chan struct{} // signals that the executor should shutdown
registered chan struct{} // closed when registerd
outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver
dockerClient dockertools.DockerInterface dockerClient dockertools.DockerInterface
suicideWatch suicideWatcher suicideWatch suicideWatcher
@ -113,14 +113,12 @@ type KubernetesExecutor struct {
exitFunc func(int) exitFunc func(int)
podStatusFunc func(*api.Pod) (*api.PodStatus, error) podStatusFunc func(*api.Pod) (*api.PodStatus, error)
staticPodsConfigPath string staticPodsConfigPath string
initialRegComplete chan struct{}
podController *framework.Controller podController *framework.Controller
launchGracePeriod time.Duration launchGracePeriod time.Duration
} }
type Config struct { type Config struct {
Updates chan<- interface{} // to send pod config updates to the kubelet Updates chan<- kubetypes.PodUpdate // to send pod config updates to the kubelet
SourceName string
APIClient *client.Client APIClient *client.Client
Docker dockertools.DockerInterface Docker dockertools.DockerInterface
ShutdownAlert func() ShutdownAlert func()
@ -144,9 +142,8 @@ func New(config Config) *KubernetesExecutor {
state: disconnectedState, state: disconnectedState,
tasks: make(map[string]*kuberTask), tasks: make(map[string]*kuberTask),
pods: make(map[string]*api.Pod), pods: make(map[string]*api.Pod),
sourcename: config.SourceName,
client: config.APIClient, client: config.APIClient,
done: make(chan struct{}), terminate: make(chan struct{}),
outgoing: make(chan func() (mesos.Status, error), 1024), outgoing: make(chan func() (mesos.Status, error), 1024),
dockerClient: config.Docker, dockerClient: config.Docker,
suicideTimeout: config.SuicideTimeout, suicideTimeout: config.SuicideTimeout,
@ -155,7 +152,7 @@ func New(config Config) *KubernetesExecutor {
shutdownAlert: config.ShutdownAlert, shutdownAlert: config.ShutdownAlert,
exitFunc: config.ExitFunc, exitFunc: config.ExitFunc,
podStatusFunc: config.PodStatusFunc, podStatusFunc: config.PodStatusFunc,
initialRegComplete: make(chan struct{}), registered: make(chan struct{}),
staticPodsConfigPath: config.StaticPodsConfigPath, staticPodsConfigPath: config.StaticPodsConfigPath,
launchGracePeriod: config.LaunchGracePeriod, launchGracePeriod: config.LaunchGracePeriod,
} }
@ -185,26 +182,24 @@ func New(config Config) *KubernetesExecutor {
return k return k
} }
func (k *KubernetesExecutor) InitialRegComplete() <-chan struct{} { // InitiallyRegistered returns a channel which is closed when the executor is
return k.initialRegComplete // registered with the Mesos master.
func (k *KubernetesExecutor) InitiallyRegistered() <-chan struct{} {
return k.registered
} }
func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) { func (k *KubernetesExecutor) Init(driver bindings.ExecutorDriver) {
k.killKubeletContainers() k.killKubeletContainers()
k.resetSuicideWatch(driver) k.resetSuicideWatch(driver)
go k.podController.Run(k.done) go k.podController.Run(k.terminate)
go k.sendLoop() go k.sendLoop()
//TODO(jdef) monitor kubeletFinished and shutdown if it happens //TODO(jdef) monitor kubeletFinished and shutdown if it happens
} }
func (k *KubernetesExecutor) Done() <-chan struct{} {
return k.done
}
func (k *KubernetesExecutor) isDone() bool { func (k *KubernetesExecutor) isDone() bool {
select { select {
case <-k.done: case <-k.terminate:
return true return true
default: default:
return false return false
@ -234,7 +229,12 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver,
} }
} }
k.initialRegistration.Do(k.onInitialRegistration) k.updateChan <- kubetypes.PodUpdate{
Pods: []*api.Pod{},
Op: kubetypes.SET,
}
close(k.registered)
} }
// Reregistered is called when the executor is successfully re-registered with the slave. // Reregistered is called when the executor is successfully re-registered with the slave.
@ -255,12 +255,6 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI
} }
} }
k.initialRegistration.Do(k.onInitialRegistration)
}
func (k *KubernetesExecutor) onInitialRegistration() {
defer close(k.initialRegComplete)
// emit an empty update to allow the mesos "source" to be marked as seen // emit an empty update to allow the mesos "source" to be marked as seen
k.lock.Lock() k.lock.Lock()
defer k.lock.Unlock() defer k.lock.Unlock()
@ -270,7 +264,6 @@ func (k *KubernetesExecutor) onInitialRegistration() {
k.updateChan <- kubetypes.PodUpdate{ k.updateChan <- kubetypes.PodUpdate{
Pods: []*api.Pod{}, Pods: []*api.Pod{},
Op: kubetypes.SET, Op: kubetypes.SET,
Source: k.sourcename,
} }
} }
@ -818,7 +811,8 @@ func (k *KubernetesExecutor) doShutdown(driver bindings.ExecutorDriver) {
(&k.state).transitionTo(terminalState) (&k.state).transitionTo(terminalState)
// signal to all listeners that this KubeletExecutor is done! // signal to all listeners that this KubeletExecutor is done!
close(k.done) close(k.terminate)
close(k.updateChan)
if k.shutdownAlert != nil { if k.shutdownAlert != nil {
func() { func() {
@ -892,7 +886,7 @@ func newStatus(taskId *mesos.TaskID, state mesos.TaskState, message string) *mes
func (k *KubernetesExecutor) sendStatus(driver bindings.ExecutorDriver, status *mesos.TaskStatus) { func (k *KubernetesExecutor) sendStatus(driver bindings.ExecutorDriver, status *mesos.TaskStatus) {
select { select {
case <-k.done: case <-k.terminate:
default: default:
k.outgoing <- func() (mesos.Status, error) { return driver.SendStatusUpdate(status) } k.outgoing <- func() (mesos.Status, error) { return driver.SendStatusUpdate(status) }
} }
@ -900,7 +894,7 @@ func (k *KubernetesExecutor) sendStatus(driver bindings.ExecutorDriver, status *
func (k *KubernetesExecutor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg string) { func (k *KubernetesExecutor) sendFrameworkMessage(driver bindings.ExecutorDriver, msg string) {
select { select {
case <-k.done: case <-k.terminate:
default: default:
k.outgoing <- func() (mesos.Status, error) { return driver.SendFrameworkMessage(msg) } k.outgoing <- func() (mesos.Status, error) { return driver.SendFrameworkMessage(msg) }
} }
@ -910,12 +904,12 @@ func (k *KubernetesExecutor) sendLoop() {
defer log.V(1).Info("sender loop exiting") defer log.V(1).Info("sender loop exiting")
for { for {
select { select {
case <-k.done: case <-k.terminate:
return return
default: default:
if !k.isConnected() { if !k.isConnected() {
select { select {
case <-k.done: case <-k.terminate:
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
} }
continue continue
@ -935,7 +929,7 @@ func (k *KubernetesExecutor) sendLoop() {
} }
// attempt to re-queue the sender // attempt to re-queue the sender
select { select {
case <-k.done: case <-k.terminate:
case k.outgoing <- sender: case k.outgoing <- sender:
} }
} }

View File

@ -65,17 +65,13 @@ func TestExecutorRegister(t *testing.T) {
initialPodUpdate := kubetypes.PodUpdate{ initialPodUpdate := kubetypes.PodUpdate{
Pods: []*api.Pod{}, Pods: []*api.Pod{},
Op: kubetypes.SET, Op: kubetypes.SET,
Source: executor.sourcename,
} }
receivedInitialPodUpdate := false receivedInitialPodUpdate := false
select { select {
case m := <-updates: case update := <-updates:
update, ok := m.(kubetypes.PodUpdate)
if ok {
if reflect.DeepEqual(initialPodUpdate, update) { if reflect.DeepEqual(initialPodUpdate, update) {
receivedInitialPodUpdate = true receivedInitialPodUpdate = true
} }
}
case <-time.After(time.Second): case <-time.After(time.Second):
} }
assert.Equal(t, true, receivedInitialPodUpdate, assert.Equal(t, true, receivedInitialPodUpdate,
@ -136,7 +132,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
defer testApiServer.server.Close() defer testApiServer.server.Close()
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
updates := make(chan interface{}, 1024) updates := make(chan kubetypes.PodUpdate, 1024)
config := Config{ config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"), Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: updates, Updates: updates,
@ -205,9 +201,8 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
gotPodUpdate := false gotPodUpdate := false
select { select {
case m := <-updates: case update := <-updates:
update, ok := m.(kubetypes.PodUpdate) if len(update.Pods) == 1 {
if ok && len(update.Pods) == 1 {
gotPodUpdate = true gotPodUpdate = true
} }
case <-time.After(time.Second): case <-time.After(time.Second):
@ -302,7 +297,7 @@ func TestExecutorStaticPods(t *testing.T) {
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
config := Config{ config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"), Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan interface{}, 1), // allow kube-executor source to proceed past init Updates: make(chan kubetypes.PodUpdate, 1), // allow kube-executor source to proceed past init
APIClient: client.NewOrDie(&client.Config{ APIClient: client.NewOrDie(&client.Config{
Host: testApiServer.server.URL, Host: testApiServer.server.URL,
Version: testapi.Default.Version(), Version: testapi.Default.Version(),
@ -384,7 +379,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
kubeletFinished := make(chan struct{}) kubeletFinished := make(chan struct{})
config := Config{ config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"), Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan interface{}, 1024), Updates: make(chan kubetypes.PodUpdate, 1024),
APIClient: client.NewOrDie(&client.Config{ APIClient: client.NewOrDie(&client.Config{
Host: testApiServer.server.URL, Host: testApiServer.server.URL,
Version: testapi.Default.Version(), Version: testapi.Default.Version(),
@ -560,9 +555,10 @@ func TestExecutorShutdown(t *testing.T) {
mockDriver := &MockExecutorDriver{} mockDriver := &MockExecutorDriver{}
kubeletFinished := make(chan struct{}) kubeletFinished := make(chan struct{})
var exitCalled int32 = 0 var exitCalled int32 = 0
updates := make(chan kubetypes.PodUpdate, 1024)
config := Config{ config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"), Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan interface{}, 1024), Updates: updates,
ShutdownAlert: func() { ShutdownAlert: func() {
close(kubeletFinished) close(kubeletFinished)
}, },
@ -586,11 +582,21 @@ func TestExecutorShutdown(t *testing.T) {
assert.Equal(t, true, executor.isDone(), assert.Equal(t, true, executor.isDone(),
"executor should be in Done state after Shutdown") "executor should be in Done state after Shutdown")
// channel should be closed now, only a constant number of updates left
num := len(updates)
drainLoop:
for {
select { select {
case <-executor.Done(): case _, ok := <-updates:
default: if !ok {
t.Fatal("done channel should be closed after shutdown") break drainLoop
} }
num -= 1
default:
t.Fatal("Updates chan should be closed after Shutdown")
}
}
assert.Equal(t, num, 0, "Updates chan should get no new updates after Shutdown")
assert.Equal(t, true, atomic.LoadInt32(&exitCalled) > 0, assert.Equal(t, true, atomic.LoadInt32(&exitCalled) > 0,
"the executor should call its ExitFunc when it is ready to close down") "the executor should call its ExitFunc when it is ready to close down")

View File

@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
) )
@ -65,13 +66,12 @@ func (m *MockExecutorDriver) SendFrameworkMessage(msg string) (mesosproto.Status
return args.Get(0).(mesosproto.Status), args.Error(1) return args.Get(0).(mesosproto.Status), args.Error(1)
} }
func NewTestKubernetesExecutor() (*KubernetesExecutor, chan interface{}) { func NewTestKubernetesExecutor() (*KubernetesExecutor, chan kubetypes.PodUpdate) {
updates := make(chan interface{}, 1024) updates := make(chan kubetypes.PodUpdate, 1024)
return New(Config{ return New(Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"), Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: updates, Updates: updates,
PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch, PodLW: &NewMockPodsListWatch(api.PodList{}).ListWatch,
SourceName: "executor_test",
}), updates }), updates
} }

View File

@ -18,13 +18,13 @@ package service
import ( import (
"fmt" "fmt"
"math/rand"
"net" "net"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
log "github.com/golang/glog" log "github.com/golang/glog"
@ -35,10 +35,8 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/executor/config" "k8s.io/kubernetes/contrib/mesos/pkg/executor/config"
"k8s.io/kubernetes/contrib/mesos/pkg/hyperkube" "k8s.io/kubernetes/contrib/mesos/pkg/hyperkube"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/credentialprovider" "k8s.io/kubernetes/pkg/credentialprovider"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
@ -50,6 +48,7 @@ import (
utilio "k8s.io/kubernetes/pkg/util/io" utilio "k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/rand"
) )
const ( const (
@ -62,6 +61,8 @@ type KubeletExecutorServer struct {
*app.KubeletServer *app.KubeletServer
SuicideTimeout time.Duration SuicideTimeout time.Duration
LaunchGracePeriod time.Duration LaunchGracePeriod time.Duration
kletLock sync.Mutex
klet *kubelet.Kubelet
} }
func NewKubeletExecutorServer() *KubeletExecutorServer { func NewKubeletExecutorServer() *KubeletExecutorServer {
@ -100,6 +101,13 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
s.SystemContainer = "" s.SystemContainer = ""
s.DockerDaemonContainer = "" s.DockerDaemonContainer = ""
// create static pods directory
staticPodsConfigPath := filepath.Join(s.RootDirectory, "static-pods")
err := os.Mkdir(staticPodsConfigPath, 0755)
if err != nil {
return err
}
// create apiserver client // create apiserver client
var apiclient *client.Client var apiclient *client.Client
clientConfig, err := s.CreateAPIServerClientConfig() clientConfig, err := s.CreateAPIServerClientConfig()
@ -173,6 +181,43 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
log.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName) log.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName)
dockerExecHandler = &dockertools.NativeExecHandler{} dockerExecHandler = &dockertools.NativeExecHandler{}
} }
dockerClient := dockertools.ConnectToDockerOrDie(s.DockerEndpoint)
//TODO(jdef) either configure Watch here with something useful, or else
// get rid of it from executor.Config
kubeletFinished := make(chan struct{})
execUpdates := make(chan kubetypes.PodUpdate, 1)
exec := executor.New(executor.Config{
Updates: execUpdates,
APIClient: apiclient,
Docker: dockerClient,
SuicideTimeout: s.SuicideTimeout,
KubeletFinished: kubeletFinished,
ExitFunc: os.Exit,
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
s.kletLock.Lock()
defer s.kletLock.Unlock()
if s.klet == nil {
return nil, fmt.Errorf("PodStatucFunc called before kubelet is initialized")
}
status, err := s.klet.GetRuntime().GetPodStatus(pod)
if err != nil {
return nil, err
}
status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses)
hostIP, err := s.klet.GetHostIP()
if err != nil {
log.Errorf("Cannot get host IP: %v", err)
} else {
status.HostIP = hostIP.String()
}
return status, nil
},
StaticPodsConfigPath: staticPodsConfigPath,
})
manifestURLHeader := make(http.Header) manifestURLHeader := make(http.Header)
if s.ManifestURLHeader != "" { if s.ManifestURLHeader != "" {
@ -183,6 +228,30 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
manifestURLHeader.Set(pieces[0], pieces[1]) manifestURLHeader.Set(pieces[0], pieces[1])
} }
// initialize driver and initialize the executor with it
dconfig := bindings.DriverConfig{
Executor: exec,
HostnameOverride: s.HostnameOverride,
BindingAddress: s.Address,
}
driver, err := bindings.NewMesosExecutorDriver(dconfig)
if err != nil {
log.Fatalf("failed to create executor driver: %v", err)
}
log.V(2).Infof("Initialize executor driver...")
exec.Init(driver)
// start the driver
go func() {
if _, err := driver.Run(); err != nil {
log.Fatalf("executor driver failed: %v", err)
}
log.Info("executor Run completed")
}()
<-exec.InitiallyRegistered()
// prepare kubelet
kcfg := app.KubeletConfig{ kcfg := app.KubeletConfig{
Address: s.Address, Address: s.Address,
AllowPrivileged: s.AllowPrivileged, AllowPrivileged: s.AllowPrivileged,
@ -196,7 +265,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
ContainerRuntime: s.ContainerRuntime, ContainerRuntime: s.ContainerRuntime,
CPUCFSQuota: s.CPUCFSQuota, CPUCFSQuota: s.CPUCFSQuota,
DiskSpacePolicy: diskSpacePolicy, DiskSpacePolicy: diskSpacePolicy,
DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), DockerClient: dockerClient,
DockerDaemonContainer: s.DockerDaemonContainer, DockerDaemonContainer: s.DockerDaemonContainer,
DockerExecHandler: dockerExecHandler, DockerExecHandler: dockerExecHandler,
EnableDebuggingHandlers: s.EnableDebuggingHandlers, EnableDebuggingHandlers: s.EnableDebuggingHandlers,
@ -248,9 +317,8 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
kcfg.NodeName = kcfg.Hostname kcfg.NodeName = kcfg.Hostname
kcfg.Builder = app.KubeletBuilder(func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) { kcfg.Builder = app.KubeletBuilder(func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) {
return s.createAndInitKubelet(kc, hks, clientConfig) return s.createAndInitKubelet(kc, clientConfig, staticPodsConfigPath, execUpdates, kubeletFinished)
}) })
err = app.RunKubelet(&kcfg) err = app.RunKubelet(&kcfg)
if err != nil { if err != nil {
return err return err
@ -282,8 +350,10 @@ func defaultBindingAddress() string {
func (ks *KubeletExecutorServer) createAndInitKubelet( func (ks *KubeletExecutorServer) createAndInitKubelet(
kc *app.KubeletConfig, kc *app.KubeletConfig,
hks hyperkube.Interface,
clientConfig *client.Config, clientConfig *client.Config,
staticPodsConfigPath string,
execUpdates <-chan kubetypes.PodUpdate,
kubeletDone chan<- struct{},
) (app.KubeletBootstrap, *kconfig.PodConfig, error) { ) (app.KubeletBootstrap, *kconfig.PodConfig, error) {
// TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop // TODO(k8s): block until all sources have delivered at least one update to the channel, or break the sync loop
@ -304,8 +374,24 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
MaxContainers: kc.MaxContainerCount, MaxContainers: kc.MaxContainerCount,
} }
// create main pod source
pc := kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kc.Recorder) pc := kconfig.NewPodConfig(kconfig.PodConfigNotificationIncremental, kc.Recorder)
updates := pc.Channel(MESOS_CFG_SOURCE) updates := pc.Channel(MESOS_CFG_SOURCE)
executorDone := make(chan struct{})
go func() {
// execUpdates will be closed by the executor on shutdown
defer close(executorDone)
for u := range execUpdates {
u.Source = MESOS_CFG_SOURCE
updates <- u
}
}()
// create static-pods directory file source
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
fileSourceUpdates := pc.Channel(kubetypes.FileSource)
kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates)
klet, err := kubelet.NewMainKubelet( klet, err := kubelet.NewMainKubelet(
kc.Hostname, kc.Hostname,
@ -363,85 +449,22 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
return nil, nil, err return nil, nil, err
} }
// create static pods directory ks.kletLock.Lock()
staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods") ks.klet = klet
err = os.Mkdir(staticPodsConfigPath, 0755) ks.kletLock.Unlock()
if err != nil {
return nil, nil, err
}
//TODO(jdef) either configure Watch here with something useful, or else
// get rid of it from executor.Config
kubeletFinished := make(chan struct{})
exec := executor.New(executor.Config{
Updates: updates,
SourceName: MESOS_CFG_SOURCE,
APIClient: kc.KubeClient,
Docker: kc.DockerClient,
SuicideTimeout: ks.SuicideTimeout,
LaunchGracePeriod: ks.LaunchGracePeriod,
KubeletFinished: kubeletFinished,
ExitFunc: os.Exit,
PodStatusFunc: func(pod *api.Pod) (*api.PodStatus, error) {
status, err := klet.GetRuntime().GetPodStatus(pod)
if err != nil {
return nil, err
}
status.Phase = kubelet.GetPhase(&pod.Spec, status.ContainerStatuses)
hostIP, err := klet.GetHostIP()
if err != nil {
log.Errorf("Cannot get host IP: %v", err)
} else {
status.HostIP = hostIP.String()
}
return status, nil
},
StaticPodsConfigPath: staticPodsConfigPath,
PodLW: cache.NewListWatchFromClient(kc.KubeClient, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, kc.NodeName)),
})
// initialize driver and initialize the executor with it
dconfig := bindings.DriverConfig{
Executor: exec,
HostnameOverride: ks.HostnameOverride,
BindingAddress: ks.Address,
}
driver, err := bindings.NewMesosExecutorDriver(dconfig)
if err != nil {
log.Fatalf("failed to create executor driver: %v", err)
}
log.V(2).Infof("Initialize executor driver...")
exec.Init(driver)
// start the driver
go func() {
if _, err := driver.Run(); err != nil {
log.Fatalf("executor driver failed: %v", err)
}
log.Info("executor Run completed")
}()
<- exec.InitialRegComplete()
k := &kubeletExecutor{ k := &kubeletExecutor{
Kubelet: klet, Kubelet: ks.klet,
address: ks.Address, address: ks.Address,
dockerClient: kc.DockerClient, dockerClient: kc.DockerClient,
hks: hks, kubeletDone: kubeletDone,
kubeletFinished: kubeletFinished,
executorDone: exec.Done(),
clientConfig: clientConfig, clientConfig: clientConfig,
executorDone: executorDone,
} }
k.BirthCry() k.BirthCry()
k.StartGarbageCollection() k.StartGarbageCollection()
// create static-pods directory file source
log.V(2).Infof("initializing static pods source factory, configured at path %q", staticPodsConfigPath)
fileSourceUpdates := pc.Channel(kubetypes.FileSource)
kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates)
return k, pc, nil return k, pc, nil
} }
@ -450,9 +473,8 @@ type kubeletExecutor struct {
*kubelet.Kubelet *kubelet.Kubelet
address net.IP address net.IP
dockerClient dockertools.DockerInterface dockerClient dockertools.DockerInterface
hks hyperkube.Interface kubeletDone chan<- struct{} // closed once kubelet.Run() returns
kubeletFinished chan struct{} // closed once kubelet.Run() returns executorDone <-chan struct{} // closed when executor terminates
executorDone <-chan struct{} // from KubeletExecutor.Done()
clientConfig *client.Config clientConfig *client.Config
} }
@ -463,21 +485,21 @@ func (kl *kubeletExecutor) ListenAndServe(address net.IP, port uint, tlsOptions
// runs the main kubelet loop, closing the kubeletFinished chan when the loop exits. // runs the main kubelet loop, closing the kubeletFinished chan when the loop exits.
// never returns. // never returns.
func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) { func (kl *kubeletExecutor) Run(mergedUpdates <-chan kubetypes.PodUpdate) {
defer func() { defer func() {
close(kl.kubeletFinished) close(kl.kubeletDone)
util.HandleCrash() util.HandleCrash()
log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity log.Infoln("kubelet run terminated") //TODO(jdef) turn down verbosity
// important: never return! this is in our contract // important: never return! this is in our contract
select {} select {}
}() }()
// push updates through a closable pipe. when the executor indicates shutdown // push merged updates into another, closable update channel which is closed
// via Done() we want to stop the Kubelet from processing updates. // when the executor shuts down.
pipe := make(chan kubetypes.PodUpdate) closableUpdates := make(chan kubetypes.PodUpdate)
go func() { go func() {
// closing pipe will cause our patched kubelet's syncLoop() to exit // closing closableUpdates will cause our patched kubelet's syncLoop() to exit
defer close(pipe) defer close(closableUpdates)
pipeLoop: pipeLoop:
for { for {
select { select {
@ -485,9 +507,9 @@ func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) {
break pipeLoop break pipeLoop
default: default:
select { select {
case u := <-updates: case u := <-mergedUpdates:
select { select {
case pipe <- u: // noop case closableUpdates <- u: // noop
case <-kl.executorDone: case <-kl.executorDone:
break pipeLoop break pipeLoop
} }
@ -498,12 +520,12 @@ func (kl *kubeletExecutor) Run(updates <-chan kubetypes.PodUpdate) {
} }
}() }()
// we expect that Run() will complete after the pipe is closed and the // we expect that Run() will complete after closableUpdates is closed and the
// kubelet's syncLoop() has finished processing its backlog, which hopefully // kubelet's syncLoop() has finished processing its backlog, which hopefully
// will not take very long. Peeking into the future (current k8s master) it // will not take very long. Peeking into the future (current k8s master) it
// seems that the backlog has grown from 1 to 50 -- this may negatively impact // seems that the backlog has grown from 1 to 50 -- this may negatively impact
// us going forward, time will tell. // us going forward, time will tell.
util.Until(func() { kl.Kubelet.Run(pipe) }, 0, kl.executorDone) util.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone)
//TODO(jdef) revisit this if/when executor failover lands //TODO(jdef) revisit this if/when executor failover lands
// Force kubelet to delete all pods. // Force kubelet to delete all pods.