Merge pull request #551 from erictune/private_kubelet

Private kubelet
pull/6/head
Daniel Smith 2014-07-22 18:58:43 -07:00
commit ec0f639a21
4 changed files with 79 additions and 68 deletions

View File

@ -104,11 +104,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, 30*time.Second, cfg1.Channel("etcd"))
config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url"))
myKubelet := &kubelet.Kubelet{
Hostname: machineList[0],
DockerClient: &fakeDocker1,
DockerPuller: &kubelet.FakeDockerPuller{},
}
myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], &fakeDocker1)
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
go util.Forever(cfg1.Sync, 3*time.Second)
go util.Forever(func() {
@ -120,11 +116,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
// have a place they can schedule.
cfg2 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, 30*time.Second, cfg2.Channel("etcd"))
otherKubelet := &kubelet.Kubelet{
Hostname: machineList[1],
DockerClient: &fakeDocker2,
DockerPuller: &kubelet.FakeDockerPuller{},
}
otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], &fakeDocker2)
go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0)
go util.Forever(cfg2.Sync, 3*time.Second)
go util.Forever(func() {

View File

@ -32,6 +32,7 @@ import (
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient"
@ -104,12 +105,6 @@ func main() {
hostname := getHostname()
k := &kubelet.Kubelet{
Hostname: hostname,
DockerClient: dockerClient,
CadvisorClient: cadvisorClient,
}
// source of all configuration
cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates)
@ -124,15 +119,22 @@ func main() {
}
// define etcd config source and initialize etcd client
var etcdClient tools.EtcdClient
if len(etcdServerList) > 0 {
glog.Infof("Watching for etcd configs at %v", etcdServerList)
k.EtcdClient = etcd.NewClient(etcdServerList)
kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), k.EtcdClient, 30*time.Second, cfg.Channel("etcd"))
etcdClient = etcd.NewClient(etcdServerList)
kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), etcdClient, 30*time.Second, cfg.Channel("etcd"))
}
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
k := kubelet.NewMainKubelet(
getHostname(),
dockerClient,
cadvisorClient,
etcdClient)
// start the kubelet
go util.Forever(func() { k.Run(cfg.Updates()) }, 0)

View File

@ -57,47 +57,64 @@ type SyncHandler interface {
type volumeMap map[string]volume.Interface
// New creates a new Kubelet.
// TODO: currently it is only called by test code.
// Need cleanup.
func New() *Kubelet {
return &Kubelet{}
// New creates a new Kubelet for use in main
func NewMainKubelet(
hn string,
dc DockerInterface,
cc CadvisorInterface,
ec tools.EtcdClient) *Kubelet {
return &Kubelet{
hostname: hn,
dockerClient: dc,
cadvisorClient: cc,
etcdClient: ec,
}
}
// NewIntegrationTestKubelet creates a new Kubelet for use in integration tests.
// TODO: add more integration tests, and expand parameter list as needed.
func NewIntegrationTestKubelet(hn string, dc DockerInterface) *Kubelet {
return &Kubelet{
hostname: hn,
dockerClient: dc,
dockerPuller: &FakeDockerPuller{},
}
}
// Kubelet is the main kubelet implementation.
type Kubelet struct {
Hostname string
DockerClient DockerInterface
hostname string
dockerClient DockerInterface
// Optional, no events will be sent without it
EtcdClient tools.EtcdClient
etcdClient tools.EtcdClient
// Optional, no statistics will be available if omitted
CadvisorClient CadvisorInterface
cadvisorClient CadvisorInterface
// Optional, defaults to simple implementaiton
HealthChecker health.HealthChecker
healthChecker health.HealthChecker
// Optional, defaults to simple Docker implementation
DockerPuller DockerPuller
dockerPuller DockerPuller
// Optional, defaults to /logs/ from /var/log
LogServer http.Handler
logServer http.Handler
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.LogServer == nil {
kl.LogServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.DockerPuller == nil {
kl.DockerPuller = NewDockerPuller(kl.DockerClient)
if kl.dockerPuller == nil {
kl.dockerPuller = NewDockerPuller(kl.dockerClient)
}
if kl.HealthChecker == nil {
kl.HealthChecker = health.NewHealthChecker()
if kl.healthChecker == nil {
kl.healthChecker = health.NewHealthChecker()
}
kl.syncLoop(updates, kl)
}
// LogEvent logs an event to the etcd backend.
func (kl *Kubelet) LogEvent(event *api.Event) error {
if kl.EtcdClient == nil {
if kl.etcdClient == nil {
return fmt.Errorf("no etcd client connection")
}
event.Timestamp = time.Now().Unix()
@ -107,7 +124,7 @@ func (kl *Kubelet) LogEvent(event *api.Event) error {
}
var response *etcd.Response
response, err = kl.EtcdClient.AddChild(fmt.Sprintf("/events/%s", event.Container.Name), string(data), 60*60*48 /* 2 days */)
response, err = kl.etcdClient.AddChild(fmt.Sprintf("/events/%s", event.Container.Name), string(data), 60*60*48 /* 2 days */)
// TODO(bburns) : examine response here.
if err != nil {
glog.Errorf("Error writing event: %s\n", err)
@ -228,11 +245,11 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v
WorkingDir: container.WorkingDir,
},
}
dockerContainer, err := kl.DockerClient.CreateContainer(opts)
dockerContainer, err := kl.dockerClient.CreateContainer(opts)
if err != nil {
return "", err
}
err = kl.DockerClient.StartContainer(dockerContainer.ID, &docker.HostConfig{
err = kl.dockerClient.StartContainer(dockerContainer.ID, &docker.HostConfig{
PortBindings: portBindings,
Binds: binds,
NetworkMode: netMode,
@ -242,7 +259,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v
// Kill a docker container
func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error {
err := kl.DockerClient.StopContainer(dockerContainer.ID, 10)
err := kl.dockerClient.StopContainer(dockerContainer.ID, 10)
podFullName, containerName := parseDockerName(dockerContainer.Names[0])
kl.LogEvent(&api.Event{
Event: "STOP",
@ -276,7 +293,7 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (DockerID, error) {
Image: networkContainerImage,
Ports: ports,
}
kl.DockerPuller.Pull(networkContainerImage)
kl.dockerPuller.Pull(networkContainerImage)
return kl.runContainer(pod, container, nil, "")
}
@ -327,7 +344,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan
}
glog.Infof("Container doesn't exist, creating %#v", container)
if err := kl.DockerPuller.Pull(container.Image); err != nil {
if err := kl.dockerPuller.Pull(container.Image); err != nil {
glog.Errorf("Failed to pull image: %v skipping pod %s container %s.", err, podFullName, container.Name)
continue
}
@ -346,13 +363,13 @@ type empty struct{}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []Pod) error {
glog.Infof("Desired [%s]: %+v", kl.Hostname, pods)
glog.Infof("Desired [%s]: %+v", kl.hostname, pods)
var err error
dockerIdsToKeep := map[DockerID]empty{}
keepChannel := make(chan DockerID, defaultChanSize)
waitGroup := sync.WaitGroup{}
dockerContainers, err := getKubeletDockerContainers(kl.DockerClient)
dockerContainers, err := getKubeletDockerContainers(kl.dockerClient)
if err != nil {
glog.Errorf("Error listing containers %#v", dockerContainers)
return err
@ -386,7 +403,7 @@ func (kl *Kubelet) SyncPods(pods []Pod) error {
<-ch
// Kill any containers we don't need
existingContainers, err := getKubeletDockerContainers(kl.DockerClient)
existingContainers, err := getKubeletDockerContainers(kl.dockerClient)
if err != nil {
glog.Errorf("Error listing containers: %v", err)
return err
@ -432,12 +449,12 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
case u := <-updates:
switch u.Op {
case SET:
glog.Infof("Containers changed [%s]", kl.Hostname)
glog.Infof("Containers changed [%s]", kl.hostname)
pods = u.Pods
case UPDATE:
//TODO: implement updates of containers
glog.Infof("Containers updated, not implemented [%s]", kl.Hostname)
glog.Infof("Containers updated, not implemented [%s]", kl.hostname)
continue
default:
@ -468,7 +485,7 @@ func getCadvisorContainerInfoRequest(req *info.ContainerInfoRequest) *info.Conta
// cgroup file system. e.g. The root container, which represents the whole
// machine, has path "/"; all docker containers have path "/docker/<docker id>"
func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
cinfo, err := kl.CadvisorClient.ContainerInfo(containerPath, getCadvisorContainerInfoRequest(req))
cinfo, err := kl.cadvisorClient.ContainerInfo(containerPath, getCadvisorContainerInfoRequest(req))
if err != nil {
return nil, err
}
@ -477,15 +494,15 @@ func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.Contai
// GetPodInfo returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodInfo(podFullName string) (api.PodInfo, error) {
return getDockerPodInfo(kl.DockerClient, podFullName)
return getDockerPodInfo(kl.dockerClient, podFullName)
}
// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
if kl.CadvisorClient == nil {
if kl.cadvisorClient == nil {
return nil, nil
}
dockerContainers, err := getKubeletDockerContainers(kl.DockerClient)
dockerContainers, err := getKubeletDockerContainers(kl.dockerClient)
if err != nil {
return nil, err
}
@ -502,7 +519,7 @@ func (kl *Kubelet) GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerI
}
func (kl *Kubelet) GetMachineInfo() (*info.MachineInfo, error) {
return kl.CadvisorClient.MachineInfo()
return kl.cadvisorClient.MachineInfo()
}
func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) {
@ -513,14 +530,14 @@ func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIC
if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds {
return health.Healthy, nil
}
if kl.HealthChecker == nil {
if kl.healthChecker == nil {
return health.Healthy, nil
}
return kl.HealthChecker.HealthCheck(container)
return kl.healthChecker.HealthCheck(container)
}
// Returns logs of current machine.
func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
// TODO: whitelist logs we are willing to serve
kl.LogServer.ServeHTTP(w, req)
kl.logServer.ServeHTTP(w, req)
}

View File

@ -57,10 +57,10 @@ func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDocker
err: nil,
}
kubelet := New()
kubelet.DockerClient = fakeDocker
kubelet.DockerPuller = &FakeDockerPuller{}
kubelet.EtcdClient = fakeEtcdClient
kubelet := &Kubelet{}
kubelet.dockerClient = fakeDocker
kubelet.dockerPuller = &FakeDockerPuller{}
kubelet.etcdClient = fakeEtcdClient
return kubelet, fakeEtcdClient, fakeDocker
}
@ -160,7 +160,7 @@ func TestKillContainerWithError(t *testing.T) {
},
}
kubelet, _, _ := makeTestKubelet(t)
kubelet.DockerClient = fakeDocker
kubelet.dockerClient = fakeDocker
err := kubelet.killContainer(fakeDocker.containerList[0])
verifyError(t, err)
verifyCalls(t, fakeDocker, []string{"stop"})
@ -289,7 +289,7 @@ func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status
func TestSyncPodsUnhealthy(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.HealthChecker = &FalseHealthChecker{}
kubelet.healthChecker = &FalseHealthChecker{}
fakeDocker.containerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
@ -639,7 +639,7 @@ func TestGetContainerInfo(t *testing.T) {
mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, nil)
kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.CadvisorClient = mockCadvisor
kubelet.cadvisorClient = mockCadvisor
fakeDocker.containerList = []docker.APIContainers{
{
ID: containerID,
@ -689,9 +689,9 @@ func TestGetRooInfo(t *testing.T) {
mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, nil)
kubelet := Kubelet{
DockerClient: &fakeDocker,
DockerPuller: &FakeDockerPuller{},
CadvisorClient: mockCadvisor,
dockerClient: &fakeDocker,
dockerPuller: &FakeDockerPuller{},
cadvisorClient: mockCadvisor,
}
// If the container name is an empty string, then it means the root container.
@ -746,7 +746,7 @@ func TestGetContainerInfoWhenCadvisorFailed(t *testing.T) {
mockCadvisor.On("ContainerInfo", containerPath, cadvisorReq).Return(containerInfo, expectedErr)
kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.CadvisorClient = mockCadvisor
kubelet.cadvisorClient = mockCadvisor
fakeDocker.containerList = []docker.APIContainers{
{
ID: containerID,
@ -774,7 +774,7 @@ func TestGetContainerInfoOnNonExistContainer(t *testing.T) {
mockCadvisor := &mockCadvisorClient{}
kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.CadvisorClient = mockCadvisor
kubelet.cadvisorClient = mockCadvisor
fakeDocker.containerList = []docker.APIContainers{}
stats, _ := kubelet.GetContainerInfo("qux", "foo", nil)