From ac7bf050797dede51f1e982b8af8acc8960fcec6 Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Wed, 11 Mar 2015 16:29:31 -0700 Subject: [PATCH 1/2] Kubelet has not even heard of etcd. --- cmd/integration/integration.go | 4 +- cmd/kubelet/app/server.go | 9 -- cmd/kubernetes/kubernetes.go | 2 +- pkg/kubelet/config/etcd.go | 117 -------------------------- pkg/kubelet/config/etcd_test.go | 142 -------------------------------- pkg/kubelet/kubelet.go | 5 -- pkg/kubelet/types.go | 2 +- 7 files changed, 4 insertions(+), 277 deletions(-) delete mode 100644 pkg/kubelet/config/etcd.go delete mode 100644 pkg/kubelet/config/etcd_test.go diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index cbb03a479c..adc4394ca9 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -214,13 +214,13 @@ func startComponents(manifestURL string) (apiServerURL string) { // Kubelet (localhost) testRootDir := makeTempDirOrDie("kubelet_integ_1.") glog.Infof("Using %s as root dir for kubelet #1", testRootDir) - kubeletapp.SimpleRunKubelet(cl, nil, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil) + kubeletapp.SimpleRunKubelet(cl, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil) // Kubelet (machine) // Create a second kubelet so that the guestbook example's two redis slaves both // have a place they can schedule. testRootDir = makeTempDirOrDie("kubelet_integ_2.") glog.Infof("Using %s as root dir for kubelet #2", testRootDir) - kubeletapp.SimpleRunKubelet(cl, nil, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil) + kubeletapp.SimpleRunKubelet(cl, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil) return apiServer.URL } diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 4b6e8d3e4a..b670f72a1a 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -36,7 +36,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" @@ -231,7 +230,6 @@ func (s *KubeletServer) createAPIServerClient() (*client.Client, error) { // SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an etcdClient. // Under the hood it calls RunKubelet (below) func SimpleRunKubelet(client *client.Client, - etcdClient tools.EtcdClient, dockerClient dockertools.DockerInterface, hostname, rootDir, manifestURL, address string, port uint, @@ -240,7 +238,6 @@ func SimpleRunKubelet(client *client.Client, tlsOptions *kubelet.TLSOptions) { kcfg := KubeletConfig{ KubeClient: client, - EtcdClient: etcdClient, DockerClient: dockerClient, HostnameOverride: hostname, RootDirectory: rootDir, @@ -321,10 +318,6 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig { glog.Infof("Adding manifest url: %v", kc.ManifestURL) config.NewSourceURL(kc.ManifestURL, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource)) } - if kc.EtcdClient != nil { - glog.Infof("Watching for etcd configs at %v", kc.EtcdClient.GetCluster()) - config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel(kubelet.EtcdSource)) - } if kc.KubeClient != nil { glog.Infof("Watching apiserver") config.NewSourceApiserver(kc.KubeClient, kc.Hostname, cfg.Channel(kubelet.ApiserverSource)) @@ -335,7 +328,6 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig { // KubeletConfig is all of the parameters necessary for running a kubelet. // TODO: This should probably be merged with KubeletServer. The extra object is a consequence of refactoring. type KubeletConfig struct { - EtcdClient tools.EtcdClient KubeClient *client.Client DockerClient dockertools.DockerInterface CAdvisorPort uint @@ -392,7 +384,6 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub k, err := kubelet.NewMainKubelet( kc.Hostname, kc.DockerClient, - kc.EtcdClient, kubeClient, kc.RootDirectory, kc.PodInfraContainerImage, diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index be9a3e365c..802b59a826 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -144,7 +144,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP runControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory) dockerClient := dockertools.ConnectToDockerOrDie(*dockerEndpoint) - kubeletapp.SimpleRunKubelet(cl, nil, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil) + kubeletapp.SimpleRunKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil) } func newApiClient(addr net.IP, port int) *client.Client { diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go deleted file mode 100644 index ec77388410..0000000000 --- a/pkg/kubelet/config/etcd.go +++ /dev/null @@ -1,117 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Reads the pod configuration from etcd using the Kubernetes etcd schema. -package config - -import ( - "errors" - "path" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - "github.com/golang/glog" -) - -func EtcdKeyForHost(hostname string) string { - return path.Join("/", "registry", "nodes", hostname, "boundpods") -} - -type sourceEtcd struct { - key string - helper tools.EtcdHelper - updates chan<- interface{} -} - -// NewSourceEtcd creates a config source that watches and pulls from a key in etcd -func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface{}) { - helper := tools.EtcdHelper{ - client, - latest.Codec, - tools.RuntimeVersionAdapter{latest.ResourceVersioner}, - } - source := &sourceEtcd{ - key: key, - helper: helper, - updates: updates, - } - glog.V(1).Infof("Watching etcd for %s", key) - go util.Forever(source.run, time.Second) -} - -func (s *sourceEtcd) run() { - boundPods := api.BoundPods{} - err := s.helper.ExtractObj(s.key, &boundPods, false) - if err != nil { - if tools.IsEtcdNotFound(err) { - glog.V(4).Infof("etcd failed to retrieve the value for the key %q. Error: %v", s.key, err) - return - } - glog.Errorf("etcd failed to retrieve the value for the key %q. Error: %v", s.key, err) - return - } - // Push update. Maybe an empty PodList to allow EtcdSource to be marked as seen - s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET, kubelet.EtcdSource} - index, _ := s.helper.ResourceVersioner.ResourceVersion(&boundPods) - watching := s.helper.Watch(s.key, index) - for { - select { - case event, ok := <-watching.ResultChan(): - if !ok { - return - } - if event.Type == watch.Error { - glog.Infof("Watch closed (%#v). Reopening.", event.Object) - watching.Stop() - return - } - pods, err := eventToPods(event) - if err != nil { - glog.Errorf("Failed to parse result from etcd watch: %v", err) - continue - } - - glog.V(4).Infof("Received state from etcd watch: %+v", pods) - s.updates <- kubelet.PodUpdate{pods, kubelet.SET, kubelet.EtcdSource} - } - } -} - -// eventToPods takes a watch.Event object, and turns it into a structured list of pods. -// It returns a list of containers, or an error if one occurs. -func eventToPods(ev watch.Event) ([]api.BoundPod, error) { - pods := []api.BoundPod{} - if ev.Object == nil { - return pods, nil - } - boundPods, ok := ev.Object.(*api.BoundPods) - if !ok { - return pods, errors.New("unable to parse response as BoundPods") - } - - for _, pod := range boundPods.Items { - // Always overrides the namespace provided by the etcd event. - pod.Namespace = kubelet.NamespaceDefault - pods = append(pods, pod) - } - - return pods, nil -} diff --git a/pkg/kubelet/config/etcd_test.go b/pkg/kubelet/config/etcd_test.go deleted file mode 100644 index 12760593f2..0000000000 --- a/pkg/kubelet/config/etcd_test.go +++ /dev/null @@ -1,142 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package config - -import ( - "reflect" - "testing" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" -) - -func TestEtcdSourceExistingBoundPods(t *testing.T) { - // Arrange - key := "/registry/nodes/machine/boundpods" - fakeEtcdClient := tools.NewFakeEtcdClient(t) - updates := make(chan interface{}) - - fakeEtcdClient.Set( - key, - runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ - Items: []api.BoundPod{ - { - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: "default"}, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Image: "foo:v1", - }}}}, - { - ObjectMeta: api.ObjectMeta{ - Name: "bar", - Namespace: "default"}, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Image: "foo:v1", - }}}}}}), - 0) - - // Act - NewSourceEtcd(key, fakeEtcdClient, updates) - - // Assert - select { - case got := <-updates: - update := got.(kubelet.PodUpdate) - if len(update.Pods) != 2 || - update.Pods[0].ObjectMeta.Name != "foo" || - update.Pods[1].ObjectMeta.Name != "bar" { - t.Errorf("Unexpected update response: %#v", update) - } - case <-time.After(200 * time.Millisecond): - t.Errorf("Expected update, timeout instead") - } -} - -func TestEventToPods(t *testing.T) { - tests := []struct { - input watch.Event - pods []api.BoundPod - fail bool - }{ - { - input: watch.Event{Object: nil}, - pods: []api.BoundPod{}, - fail: false, - }, - { - input: watch.Event{Object: &api.BoundPods{}}, - pods: []api.BoundPod{}, - fail: false, - }, - { - input: watch.Event{ - Object: &api.BoundPods{ - Items: []api.BoundPod{ - {ObjectMeta: api.ObjectMeta{UID: "111", Name: "foo", Namespace: "foo"}}, - {ObjectMeta: api.ObjectMeta{UID: "222", Name: "bar", Namespace: "bar"}}, - }, - }, - }, - pods: []api.BoundPod{ - { - ObjectMeta: api.ObjectMeta{UID: "111", Name: "foo", Namespace: kubelet.NamespaceDefault}, - Spec: api.PodSpec{}, - }, - { - ObjectMeta: api.ObjectMeta{UID: "222", Name: "bar", Namespace: kubelet.NamespaceDefault}, - Spec: api.PodSpec{}, - }, - }, - fail: false, - }, - { - input: watch.Event{ - Object: &api.BoundPods{ - Items: []api.BoundPod{ - {ObjectMeta: api.ObjectMeta{UID: "111", Name: "foo"}}, - }, - }, - }, - pods: []api.BoundPod{ - { - ObjectMeta: api.ObjectMeta{UID: "111", Name: "foo", Namespace: kubelet.NamespaceDefault}, - Spec: api.PodSpec{}}, - }, - fail: false, - }, - } - - for i, tt := range tests { - pods, err := eventToPods(tt.input) - if !reflect.DeepEqual(tt.pods, pods) { - t.Errorf("case %d: expected output %#v, got %#v", i, tt.pods, pods) - } - if tt.fail != (err != nil) { - t.Errorf("case %d: got fail=%t but err=%v", i, tt.fail, err) - } - } -} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0e3e9eef30..f85abf49c8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -45,7 +45,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" utilErrors "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" @@ -93,7 +92,6 @@ type volumeMap map[string]volume.Interface func NewMainKubelet( hostname string, dockerClient dockertools.DockerInterface, - etcdClient tools.EtcdClient, kubeClient client.Interface, rootDirectory string, podInfraContainerImage string, @@ -156,7 +154,6 @@ func NewMainKubelet( klet := &Kubelet{ hostname: hostname, dockerClient: dockerClient, - etcdClient: etcdClient, kubeClient: kubeClient, rootDirectory: rootDirectory, resyncInterval: resyncInterval, @@ -232,8 +229,6 @@ type Kubelet struct { dockerIDToRef map[dockertools.DockerID]*api.ObjectReference refLock sync.RWMutex - // Optional, no events will be sent without it - etcdClient tools.EtcdClient // Optional, defaults to simple Docker implementation dockerPuller dockertools.DockerPuller // Optional, defaults to /logs/ from /var/log diff --git a/pkg/kubelet/types.go b/pkg/kubelet/types.go index f7fafd8ba7..685cca8cfc 100644 --- a/pkg/kubelet/types.go +++ b/pkg/kubelet/types.go @@ -41,7 +41,7 @@ const ( // Updates from a file FileSource = "file" // Updates from etcd - EtcdSource = "etcd" + EtcdSource = "etcd" // Not supported by current kubelets. // Updates from querying a web page HTTPSource = "http" // Updates received to the kubelet server From ae9bc28f8b098bdf9f4bc51bec35f31f2732a8b3 Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Wed, 11 Mar 2015 16:40:20 -0700 Subject: [PATCH 2/2] Remove mentions of etcd in kubelet. --- cmd/kubelet/app/server.go | 5 +++-- pkg/kubelet/kubelet.go | 2 +- pkg/kubelet/kubelet_test.go | 20 ++++++++++---------- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index b670f72a1a..311705fde1 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -187,7 +187,8 @@ func (s *KubeletServer) Run(_ []string) error { func (s *KubeletServer) setupRunOnce() { if s.RunOnce { - // Don't use remote (etcd or apiserver) sources + // Don't use apiserver source, on the presumption that this flag is used + // for bootstrapping some system pods. if len(s.APIServerList) > 0 { glog.Fatalf("invalid option: --runonce and --api_servers are mutually exclusive") } @@ -227,7 +228,7 @@ func (s *KubeletServer) createAPIServerClient() (*client.Client, error) { return c, nil } -// SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an etcdClient. +// SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an API Client. // Under the hood it calls RunKubelet (below) func SimpleRunKubelet(client *client.Client, dockerClient dockertools.DockerInterface, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index f85abf49c8..04c5ef2551 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1534,7 +1534,7 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) { } // syncLoop is the main loop for processing changes. It watches for changes from -// four channels (file, etcd, server, and http) and creates a union of them. For +// three channels (file, apiserver, and http) and creates a union of them. For // any new change seen, will run a sync against desired state and running state. If // no changes are seen to the configuration, will synchronize the last known desired // state every sync_frequency seconds. Never returns. diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 2479b4ff88..bc2b835ddb 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -1390,7 +1390,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) { kubelet.runner = &fakeCommandRunner podName := "podFoo" - podNamespace := "etcd" + podNamespace := "nsFoo" containerName := "containerFoo" output, err := kubelet.RunInContainer( GetPodFullName(&api.BoundPod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}), @@ -1412,7 +1412,7 @@ func TestRunInContainer(t *testing.T) { containerID := "abc1234" podName := "podFoo" - podNamespace := "etcd" + podNamespace := "nsFoo" containerName := "containerFoo" fakeDocker.ContainerList = []docker.APIContainers{ @@ -1452,7 +1452,7 @@ func TestRunHandlerExec(t *testing.T) { containerID := "abc1234" podName := "podFoo" - podNamespace := "etcd" + podNamespace := "nsFoo" containerName := "containerFoo" fakeDocker.ContainerList = []docker.APIContainers{ @@ -1499,7 +1499,7 @@ func TestRunHandlerHttp(t *testing.T) { kubelet.httpClient = &fakeHttp podName := "podFoo" - podNamespace := "etcd" + podNamespace := "nsFoo" containerName := "containerFoo" container := api.Container{ @@ -2694,7 +2694,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) { kubelet.runner = &fakeCommandRunner podName := "podFoo" - podNamespace := "etcd" + podNamespace := "nsFoo" containerName := "containerFoo" err := kubelet.ExecInContainer( GetPodFullName(&api.BoundPod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}), @@ -2720,7 +2720,7 @@ func TestExecInContainerNoSuchContainer(t *testing.T) { kubelet.runner = &fakeCommandRunner podName := "podFoo" - podNamespace := "etcd" + podNamespace := "nsFoo" containerID := "containerFoo" fakeDocker.ContainerList = []docker.APIContainers{ @@ -2772,7 +2772,7 @@ func TestExecInContainer(t *testing.T) { kubelet.runner = &fakeCommandRunner podName := "podFoo" - podNamespace := "etcd" + podNamespace := "nsFoo" containerID := "containerFoo" command := []string{"ls"} stdin := &bytes.Buffer{} @@ -2831,7 +2831,7 @@ func TestPortForwardNoSuchPod(t *testing.T) { kubelet.runner = &fakeCommandRunner podName := "podFoo" - podNamespace := "etcd" + podNamespace := "nsFoo" var port uint16 = 5000 err := kubelet.PortForward( @@ -2854,7 +2854,7 @@ func TestPortForwardNoSuchContainer(t *testing.T) { kubelet.runner = &fakeCommandRunner podName := "podFoo" - podNamespace := "etcd" + podNamespace := "nsFoo" var port uint16 = 5000 fakeDocker.ContainerList = []docker.APIContainers{ @@ -2888,7 +2888,7 @@ func TestPortForward(t *testing.T) { kubelet.runner = &fakeCommandRunner podName := "podFoo" - podNamespace := "etcd" + podNamespace := "nsFoo" containerID := "containerFoo" var port uint16 = 5000 stream := &fakeReadWriteCloser{}