Merge pull request #5343 from erictune/rm_etcd

Kubelet has not even heard of etcd.
pull/6/head
Victor Marmol 2015-03-11 16:59:04 -07:00
commit 1e0601da8a
8 changed files with 18 additions and 290 deletions

View File

@ -214,13 +214,13 @@ func startComponents(manifestURL string) (apiServerURL string) {
// Kubelet (localhost) // Kubelet (localhost)
testRootDir := makeTempDirOrDie("kubelet_integ_1.") testRootDir := makeTempDirOrDie("kubelet_integ_1.")
glog.Infof("Using %s as root dir for kubelet #1", testRootDir) glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
kubeletapp.SimpleRunKubelet(cl, nil, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil) kubeletapp.SimpleRunKubelet(cl, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil)
// Kubelet (machine) // Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both // Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule. // have a place they can schedule.
testRootDir = makeTempDirOrDie("kubelet_integ_2.") testRootDir = makeTempDirOrDie("kubelet_integ_2.")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir) glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
kubeletapp.SimpleRunKubelet(cl, nil, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil) kubeletapp.SimpleRunKubelet(cl, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil)
return apiServer.URL return apiServer.URL
} }

View File

@ -36,7 +36,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
@ -188,7 +187,8 @@ func (s *KubeletServer) Run(_ []string) error {
func (s *KubeletServer) setupRunOnce() { func (s *KubeletServer) setupRunOnce() {
if s.RunOnce { if s.RunOnce {
// Don't use remote (etcd or apiserver) sources // Don't use apiserver source, on the presumption that this flag is used
// for bootstrapping some system pods.
if len(s.APIServerList) > 0 { if len(s.APIServerList) > 0 {
glog.Fatalf("invalid option: --runonce and --api_servers are mutually exclusive") glog.Fatalf("invalid option: --runonce and --api_servers are mutually exclusive")
} }
@ -228,10 +228,9 @@ func (s *KubeletServer) createAPIServerClient() (*client.Client, error) {
return c, nil return c, nil
} }
// SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an etcdClient. // SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an API Client.
// Under the hood it calls RunKubelet (below) // Under the hood it calls RunKubelet (below)
func SimpleRunKubelet(client *client.Client, func SimpleRunKubelet(client *client.Client,
etcdClient tools.EtcdClient,
dockerClient dockertools.DockerInterface, dockerClient dockertools.DockerInterface,
hostname, rootDir, manifestURL, address string, hostname, rootDir, manifestURL, address string,
port uint, port uint,
@ -240,7 +239,6 @@ func SimpleRunKubelet(client *client.Client,
tlsOptions *kubelet.TLSOptions) { tlsOptions *kubelet.TLSOptions) {
kcfg := KubeletConfig{ kcfg := KubeletConfig{
KubeClient: client, KubeClient: client,
EtcdClient: etcdClient,
DockerClient: dockerClient, DockerClient: dockerClient,
HostnameOverride: hostname, HostnameOverride: hostname,
RootDirectory: rootDir, RootDirectory: rootDir,
@ -321,10 +319,6 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
glog.Infof("Adding manifest url: %v", kc.ManifestURL) glog.Infof("Adding manifest url: %v", kc.ManifestURL)
config.NewSourceURL(kc.ManifestURL, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource)) config.NewSourceURL(kc.ManifestURL, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource))
} }
if kc.EtcdClient != nil {
glog.Infof("Watching for etcd configs at %v", kc.EtcdClient.GetCluster())
config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel(kubelet.EtcdSource))
}
if kc.KubeClient != nil { if kc.KubeClient != nil {
glog.Infof("Watching apiserver") glog.Infof("Watching apiserver")
config.NewSourceApiserver(kc.KubeClient, kc.Hostname, cfg.Channel(kubelet.ApiserverSource)) config.NewSourceApiserver(kc.KubeClient, kc.Hostname, cfg.Channel(kubelet.ApiserverSource))
@ -335,7 +329,6 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
// KubeletConfig is all of the parameters necessary for running a kubelet. // KubeletConfig is all of the parameters necessary for running a kubelet.
// TODO: This should probably be merged with KubeletServer. The extra object is a consequence of refactoring. // TODO: This should probably be merged with KubeletServer. The extra object is a consequence of refactoring.
type KubeletConfig struct { type KubeletConfig struct {
EtcdClient tools.EtcdClient
KubeClient *client.Client KubeClient *client.Client
DockerClient dockertools.DockerInterface DockerClient dockertools.DockerInterface
CAdvisorPort uint CAdvisorPort uint
@ -392,7 +385,6 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
k, err := kubelet.NewMainKubelet( k, err := kubelet.NewMainKubelet(
kc.Hostname, kc.Hostname,
kc.DockerClient, kc.DockerClient,
kc.EtcdClient,
kubeClient, kubeClient,
kc.RootDirectory, kc.RootDirectory,
kc.PodInfraContainerImage, kc.PodInfraContainerImage,

View File

@ -144,7 +144,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP
runControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory) runControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory)
dockerClient := dockertools.ConnectToDockerOrDie(*dockerEndpoint) dockerClient := dockertools.ConnectToDockerOrDie(*dockerEndpoint)
kubeletapp.SimpleRunKubelet(cl, nil, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil) kubeletapp.SimpleRunKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil)
} }
func newApiClient(addr net.IP, port int) *client.Client { func newApiClient(addr net.IP, port int) *client.Client {

View File

@ -1,117 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Reads the pod configuration from etcd using the Kubernetes etcd schema.
package config
import (
"errors"
"path"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
func EtcdKeyForHost(hostname string) string {
return path.Join("/", "registry", "nodes", hostname, "boundpods")
}
type sourceEtcd struct {
key string
helper tools.EtcdHelper
updates chan<- interface{}
}
// NewSourceEtcd creates a config source that watches and pulls from a key in etcd
func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface{}) {
helper := tools.EtcdHelper{
client,
latest.Codec,
tools.RuntimeVersionAdapter{latest.ResourceVersioner},
}
source := &sourceEtcd{
key: key,
helper: helper,
updates: updates,
}
glog.V(1).Infof("Watching etcd for %s", key)
go util.Forever(source.run, time.Second)
}
func (s *sourceEtcd) run() {
boundPods := api.BoundPods{}
err := s.helper.ExtractObj(s.key, &boundPods, false)
if err != nil {
if tools.IsEtcdNotFound(err) {
glog.V(4).Infof("etcd failed to retrieve the value for the key %q. Error: %v", s.key, err)
return
}
glog.Errorf("etcd failed to retrieve the value for the key %q. Error: %v", s.key, err)
return
}
// Push update. Maybe an empty PodList to allow EtcdSource to be marked as seen
s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET, kubelet.EtcdSource}
index, _ := s.helper.ResourceVersioner.ResourceVersion(&boundPods)
watching := s.helper.Watch(s.key, index)
for {
select {
case event, ok := <-watching.ResultChan():
if !ok {
return
}
if event.Type == watch.Error {
glog.Infof("Watch closed (%#v). Reopening.", event.Object)
watching.Stop()
return
}
pods, err := eventToPods(event)
if err != nil {
glog.Errorf("Failed to parse result from etcd watch: %v", err)
continue
}
glog.V(4).Infof("Received state from etcd watch: %+v", pods)
s.updates <- kubelet.PodUpdate{pods, kubelet.SET, kubelet.EtcdSource}
}
}
}
// eventToPods takes a watch.Event object, and turns it into a structured list of pods.
// It returns a list of containers, or an error if one occurs.
func eventToPods(ev watch.Event) ([]api.BoundPod, error) {
pods := []api.BoundPod{}
if ev.Object == nil {
return pods, nil
}
boundPods, ok := ev.Object.(*api.BoundPods)
if !ok {
return pods, errors.New("unable to parse response as BoundPods")
}
for _, pod := range boundPods.Items {
// Always overrides the namespace provided by the etcd event.
pod.Namespace = kubelet.NamespaceDefault
pods = append(pods, pod)
}
return pods, nil
}

View File

@ -1,142 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"reflect"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func TestEtcdSourceExistingBoundPods(t *testing.T) {
// Arrange
key := "/registry/nodes/machine/boundpods"
fakeEtcdClient := tools.NewFakeEtcdClient(t)
updates := make(chan interface{})
fakeEtcdClient.Set(
key,
runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "default"},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo:v1",
}}}},
{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default"},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo:v1",
}}}}}}),
0)
// Act
NewSourceEtcd(key, fakeEtcdClient, updates)
// Assert
select {
case got := <-updates:
update := got.(kubelet.PodUpdate)
if len(update.Pods) != 2 ||
update.Pods[0].ObjectMeta.Name != "foo" ||
update.Pods[1].ObjectMeta.Name != "bar" {
t.Errorf("Unexpected update response: %#v", update)
}
case <-time.After(200 * time.Millisecond):
t.Errorf("Expected update, timeout instead")
}
}
func TestEventToPods(t *testing.T) {
tests := []struct {
input watch.Event
pods []api.BoundPod
fail bool
}{
{
input: watch.Event{Object: nil},
pods: []api.BoundPod{},
fail: false,
},
{
input: watch.Event{Object: &api.BoundPods{}},
pods: []api.BoundPod{},
fail: false,
},
{
input: watch.Event{
Object: &api.BoundPods{
Items: []api.BoundPod{
{ObjectMeta: api.ObjectMeta{UID: "111", Name: "foo", Namespace: "foo"}},
{ObjectMeta: api.ObjectMeta{UID: "222", Name: "bar", Namespace: "bar"}},
},
},
},
pods: []api.BoundPod{
{
ObjectMeta: api.ObjectMeta{UID: "111", Name: "foo", Namespace: kubelet.NamespaceDefault},
Spec: api.PodSpec{},
},
{
ObjectMeta: api.ObjectMeta{UID: "222", Name: "bar", Namespace: kubelet.NamespaceDefault},
Spec: api.PodSpec{},
},
},
fail: false,
},
{
input: watch.Event{
Object: &api.BoundPods{
Items: []api.BoundPod{
{ObjectMeta: api.ObjectMeta{UID: "111", Name: "foo"}},
},
},
},
pods: []api.BoundPod{
{
ObjectMeta: api.ObjectMeta{UID: "111", Name: "foo", Namespace: kubelet.NamespaceDefault},
Spec: api.PodSpec{}},
},
fail: false,
},
}
for i, tt := range tests {
pods, err := eventToPods(tt.input)
if !reflect.DeepEqual(tt.pods, pods) {
t.Errorf("case %d: expected output %#v, got %#v", i, tt.pods, pods)
}
if tt.fail != (err != nil) {
t.Errorf("case %d: got fail=%t but err=%v", i, tt.fail, err)
}
}
}

View File

@ -45,7 +45,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
utilErrors "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" utilErrors "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
@ -93,7 +92,6 @@ type volumeMap map[string]volume.Interface
func NewMainKubelet( func NewMainKubelet(
hostname string, hostname string,
dockerClient dockertools.DockerInterface, dockerClient dockertools.DockerInterface,
etcdClient tools.EtcdClient,
kubeClient client.Interface, kubeClient client.Interface,
rootDirectory string, rootDirectory string,
podInfraContainerImage string, podInfraContainerImage string,
@ -156,7 +154,6 @@ func NewMainKubelet(
klet := &Kubelet{ klet := &Kubelet{
hostname: hostname, hostname: hostname,
dockerClient: dockerClient, dockerClient: dockerClient,
etcdClient: etcdClient,
kubeClient: kubeClient, kubeClient: kubeClient,
rootDirectory: rootDirectory, rootDirectory: rootDirectory,
resyncInterval: resyncInterval, resyncInterval: resyncInterval,
@ -232,8 +229,6 @@ type Kubelet struct {
dockerIDToRef map[dockertools.DockerID]*api.ObjectReference dockerIDToRef map[dockertools.DockerID]*api.ObjectReference
refLock sync.RWMutex refLock sync.RWMutex
// Optional, no events will be sent without it
etcdClient tools.EtcdClient
// Optional, defaults to simple Docker implementation // Optional, defaults to simple Docker implementation
dockerPuller dockertools.DockerPuller dockerPuller dockertools.DockerPuller
// Optional, defaults to /logs/ from /var/log // Optional, defaults to /logs/ from /var/log
@ -1539,7 +1534,7 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) {
} }
// syncLoop is the main loop for processing changes. It watches for changes from // syncLoop is the main loop for processing changes. It watches for changes from
// four channels (file, etcd, server, and http) and creates a union of them. For // three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If // any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired // no changes are seen to the configuration, will synchronize the last known desired
// state every sync_frequency seconds. Never returns. // state every sync_frequency seconds. Never returns.

View File

@ -1390,7 +1390,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) {
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
podName := "podFoo" podName := "podFoo"
podNamespace := "etcd" podNamespace := "nsFoo"
containerName := "containerFoo" containerName := "containerFoo"
output, err := kubelet.RunInContainer( output, err := kubelet.RunInContainer(
GetPodFullName(&api.BoundPod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}), GetPodFullName(&api.BoundPod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}),
@ -1412,7 +1412,7 @@ func TestRunInContainer(t *testing.T) {
containerID := "abc1234" containerID := "abc1234"
podName := "podFoo" podName := "podFoo"
podNamespace := "etcd" podNamespace := "nsFoo"
containerName := "containerFoo" containerName := "containerFoo"
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
@ -1452,7 +1452,7 @@ func TestRunHandlerExec(t *testing.T) {
containerID := "abc1234" containerID := "abc1234"
podName := "podFoo" podName := "podFoo"
podNamespace := "etcd" podNamespace := "nsFoo"
containerName := "containerFoo" containerName := "containerFoo"
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
@ -1499,7 +1499,7 @@ func TestRunHandlerHttp(t *testing.T) {
kubelet.httpClient = &fakeHttp kubelet.httpClient = &fakeHttp
podName := "podFoo" podName := "podFoo"
podNamespace := "etcd" podNamespace := "nsFoo"
containerName := "containerFoo" containerName := "containerFoo"
container := api.Container{ container := api.Container{
@ -2694,7 +2694,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) {
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
podName := "podFoo" podName := "podFoo"
podNamespace := "etcd" podNamespace := "nsFoo"
containerName := "containerFoo" containerName := "containerFoo"
err := kubelet.ExecInContainer( err := kubelet.ExecInContainer(
GetPodFullName(&api.BoundPod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}), GetPodFullName(&api.BoundPod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}),
@ -2720,7 +2720,7 @@ func TestExecInContainerNoSuchContainer(t *testing.T) {
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
podName := "podFoo" podName := "podFoo"
podNamespace := "etcd" podNamespace := "nsFoo"
containerID := "containerFoo" containerID := "containerFoo"
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
@ -2772,7 +2772,7 @@ func TestExecInContainer(t *testing.T) {
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
podName := "podFoo" podName := "podFoo"
podNamespace := "etcd" podNamespace := "nsFoo"
containerID := "containerFoo" containerID := "containerFoo"
command := []string{"ls"} command := []string{"ls"}
stdin := &bytes.Buffer{} stdin := &bytes.Buffer{}
@ -2831,7 +2831,7 @@ func TestPortForwardNoSuchPod(t *testing.T) {
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
podName := "podFoo" podName := "podFoo"
podNamespace := "etcd" podNamespace := "nsFoo"
var port uint16 = 5000 var port uint16 = 5000
err := kubelet.PortForward( err := kubelet.PortForward(
@ -2854,7 +2854,7 @@ func TestPortForwardNoSuchContainer(t *testing.T) {
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
podName := "podFoo" podName := "podFoo"
podNamespace := "etcd" podNamespace := "nsFoo"
var port uint16 = 5000 var port uint16 = 5000
fakeDocker.ContainerList = []docker.APIContainers{ fakeDocker.ContainerList = []docker.APIContainers{
@ -2888,7 +2888,7 @@ func TestPortForward(t *testing.T) {
kubelet.runner = &fakeCommandRunner kubelet.runner = &fakeCommandRunner
podName := "podFoo" podName := "podFoo"
podNamespace := "etcd" podNamespace := "nsFoo"
containerID := "containerFoo" containerID := "containerFoo"
var port uint16 = 5000 var port uint16 = 5000
stream := &fakeReadWriteCloser{} stream := &fakeReadWriteCloser{}

View File

@ -41,7 +41,7 @@ const (
// Updates from a file // Updates from a file
FileSource = "file" FileSource = "file"
// Updates from etcd // Updates from etcd
EtcdSource = "etcd" EtcdSource = "etcd" // Not supported by current kubelets.
// Updates from querying a web page // Updates from querying a web page
HTTPSource = "http" HTTPSource = "http"
// Updates received to the kubelet server // Updates received to the kubelet server