diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index cc673dc6f6..aef4ebe658 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -562,53 +562,3 @@ func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (Doc } return result, nil } - -// TODO: Move this function with dockerCache to DockerManager. -func GetPods(client DockerInterface, all bool) ([]*kubecontainer.Pod, error) { - pods := make(map[types.UID]*kubecontainer.Pod) - var result []*kubecontainer.Pod - - containers, err := GetKubeletDockerContainers(client, all) - if err != nil { - return nil, err - } - - // Group containers by pod. - for _, c := range containers { - if len(c.Names) == 0 { - glog.Warningf("Cannot parse empty docker container name: %#v", c.Names) - continue - } - dockerName, hash, err := ParseDockerName(c.Names[0]) - if err != nil { - glog.Warningf("Parse docker container name %q error: %v", c.Names[0], err) - continue - } - pod, found := pods[dockerName.PodUID] - if !found { - name, namespace, err := kubecontainer.ParsePodFullName(dockerName.PodFullName) - if err != nil { - glog.Warningf("Parse pod full name %q error: %v", dockerName.PodFullName, err) - continue - } - pod = &kubecontainer.Pod{ - ID: dockerName.PodUID, - Name: name, - Namespace: namespace, - } - pods[dockerName.PodUID] = pod - } - pod.Containers = append(pod.Containers, &kubecontainer.Container{ - ID: types.UID(c.ID), - Name: dockerName.ContainerName, - Hash: hash, - Created: c.Created, - }) - } - - // Convert map to list. - for _, c := range pods { - result = append(result, c) - } - return result, nil -} diff --git a/pkg/kubelet/dockertools/docker_cache.go b/pkg/kubelet/dockertools/docker_cache.go index d1fe0d6d4c..2bc9b1b4ac 100644 --- a/pkg/kubelet/dockertools/docker_cache.go +++ b/pkg/kubelet/dockertools/docker_cache.go @@ -20,32 +20,36 @@ import ( "sync" "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" + kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" ) type DockerCache interface { - GetPods() ([]*container.Pod, error) + GetPods() ([]*kubecontainer.Pod, error) ForceUpdateIfOlder(time.Time) error } -func NewDockerCache(client DockerInterface) (DockerCache, error) { +type podsGetter interface { + GetPods(bool) ([]*kubecontainer.Pod, error) +} + +func NewDockerCache(getter podsGetter) (DockerCache, error) { return &dockerCache{ - client: client, + getter: getter, updatingCache: false, }, nil } // dockerCache is a default implementation of DockerCache interface +// TODO(yifan): Use runtime cache to replace this. type dockerCache struct { - // The underlying docker client used to update the cache. - client DockerInterface - + // The narrowed interface for updating the cache. + getter podsGetter // Mutex protecting all of the following fields. lock sync.Mutex // Last time when cache was updated. cacheTime time.Time // The content of the cache. - pods []*container.Pod + pods []*kubecontainer.Pod // Whether the background thread updating the cache is running. updatingCache bool // Time when the background thread should be stopped. @@ -55,11 +59,11 @@ type dockerCache struct { // Ensure that dockerCache abides by the DockerCache interface. var _ DockerCache = new(dockerCache) -func (d *dockerCache) GetPods() ([]*container.Pod, error) { +func (d *dockerCache) GetPods() ([]*kubecontainer.Pod, error) { d.lock.Lock() defer d.lock.Unlock() if time.Since(d.cacheTime) > 2*time.Second { - pods, err := GetPods(d.client, false) + pods, err := d.getter.GetPods(false) if err != nil { return pods, err } @@ -79,7 +83,7 @@ func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error { d.lock.Lock() defer d.lock.Unlock() if d.cacheTime.Before(minExpectedCacheTime) { - pods, err := GetPods(d.client, false) + pods, err := d.getter.GetPods(false) if err != nil { return err } @@ -93,7 +97,7 @@ func (d *dockerCache) startUpdatingCache() { run := true for run { time.Sleep(100 * time.Millisecond) - pods, err := GetPods(d.client, false) + pods, err := d.getter.GetPods(false) cacheTime := time.Now() if err != nil { continue diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index f94ba324d5..c0df853a74 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -333,17 +333,15 @@ func (f *FakeDockerPuller) IsImagePresent(name string) (bool, error) { } type FakeDockerCache struct { - client DockerInterface + getter podsGetter } -func NewFakeDockerCache(client DockerInterface) DockerCache { - return &FakeDockerCache{ - client: client, - } +func NewFakeDockerCache(getter podsGetter) DockerCache { + return &FakeDockerCache{getter: getter} } func (f *FakeDockerCache) GetPods() ([]*container.Pod, error) { - return GetPods(f.client, false) + return f.getter.GetPods(false) } func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error { diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 25a0c9e5f6..0aa2f1852b 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -520,3 +520,52 @@ func makeCapabilites(capAdd []api.CapabilityType, capDrop []api.CapabilityType) } return addCaps, dropCaps } + +func (self *DockerManager) GetPods(all bool) ([]*kubecontainer.Pod, error) { + pods := make(map[types.UID]*kubecontainer.Pod) + var result []*kubecontainer.Pod + + containers, err := GetKubeletDockerContainers(self.client, all) + if err != nil { + return nil, err + } + + // Group containers by pod. + for _, c := range containers { + if len(c.Names) == 0 { + glog.Warningf("Cannot parse empty docker container name: %#v", c.Names) + continue + } + dockerName, hash, err := ParseDockerName(c.Names[0]) + if err != nil { + glog.Warningf("Parse docker container name %q error: %v", c.Names[0], err) + continue + } + pod, found := pods[dockerName.PodUID] + if !found { + name, namespace, err := kubecontainer.ParsePodFullName(dockerName.PodFullName) + if err != nil { + glog.Warningf("Parse pod full name %q error: %v", dockerName.PodFullName, err) + continue + } + pod = &kubecontainer.Pod{ + ID: dockerName.PodUID, + Name: name, + Namespace: namespace, + } + pods[dockerName.PodUID] = pod + } + pod.Containers = append(pod.Containers, &kubecontainer.Container{ + ID: types.UID(c.ID), + Name: dockerName.ContainerName, + Hash: hash, + Created: c.Created, + }) + } + + // Convert map to list. + for _, c := range pods { + result = append(result, c) + } + return result, nil +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index b544402cc9..bb57ebd8d0 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -234,7 +234,7 @@ func NewMainKubelet( klet.podManager = newBasicPodManager(klet.kubeClient) - dockerCache, err := dockertools.NewDockerCache(dockerClient) + dockerCache, err := dockertools.NewDockerCache(containerManager) if err != nil { return nil, err } @@ -1973,9 +1973,9 @@ func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) { // findContainer finds and returns the container with the given pod ID, full name, and container name. // It returns nil if not found. -// TODO(yifan): Move this to runtime once GetPods() has the same signature as the runtime.GetPods(). +// TODO(yifan): Move this to runtime once the runtime interface has been all implemented. func (kl *Kubelet) findContainer(podFullName string, podUID types.UID, containerName string) (*kubecontainer.Container, error) { - pods, err := dockertools.GetPods(kl.dockerClient, false) + pods, err := kl.containerManager.GetPods(false) if err != nil { return nil, err } @@ -2027,7 +2027,7 @@ func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16 return fmt.Errorf("no runner specified.") } - pods, err := dockertools.GetPods(kl.dockerClient, false) + pods, err := kl.containerManager.GetPods(false) if err != nil { return err } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index ce8219f5df..8c33813b33 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -70,13 +70,12 @@ type TestKubelet struct { func newTestKubelet(t *testing.T) *TestKubelet { fakeDocker := &dockertools.FakeDockerClient{Errors: make(map[string]error), RemovedImages: util.StringSet{}} - fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker) + fakeRecorder := &record.FakeRecorder{} fakeKubeClient := &testclient.Fake{} kubelet := &Kubelet{} kubelet.dockerClient = fakeDocker - kubelet.dockerCache = fakeDockerCache kubelet.kubeClient = fakeKubeClient kubelet.dockerPuller = &dockertools.FakeDockerPuller{} kubelet.hostname = "testnode" @@ -90,14 +89,6 @@ func newTestKubelet(t *testing.T) *TestKubelet { t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err) } waitGroup := new(sync.WaitGroup) - kubelet.podWorkers = newPodWorkers( - fakeDockerCache, - func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error { - err := kubelet.syncPod(pod, mirrorPod, runningPod) - waitGroup.Done() - return err - }, - fakeRecorder) kubelet.sourcesReady = func() bool { return true } kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} @@ -114,6 +105,15 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.podManager = podManager kubelet.containerRefManager = kubecontainer.NewRefManager() kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage) + kubelet.dockerCache = dockertools.NewFakeDockerCache(kubelet.containerManager) + kubelet.podWorkers = newPodWorkers( + kubelet.dockerCache, + func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error { + err := kubelet.syncPod(pod, mirrorPod, runningPod) + waitGroup.Done() + return err + }, + fakeRecorder) return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient} } diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 2b11e7e6c5..5a25e5a3e8 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -39,8 +39,8 @@ func newPod(uid, name string) *api.Pod { func createPodWorkers() (*podWorkers, map[types.UID][]string) { fakeDocker := &dockertools.FakeDockerClient{} - fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker) - recorder := &record.FakeRecorder{} + fakeRecorder := &record.FakeRecorder{} + fakeDockerCache := dockertools.NewFakeDockerCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage)) lock := sync.Mutex{} processed := make(map[types.UID][]string) @@ -55,7 +55,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) { }() return nil }, - recorder, + fakeRecorder, ) return podWorkers, processed } diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index 4154b1c7ef..be9458a1f8 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -92,7 +92,7 @@ func (kl *Kubelet) runPod(pod api.Pod, retryDelay time.Duration) error { delay := retryDelay retry := 0 for { - pods, err := dockertools.GetPods(kl.dockerClient, false) + pods, err := kl.containerManager.GetPods(false) if err != nil { return fmt.Errorf("failed to get kubelet pods: %v", err) }