Merge pull request #6778 from yifan-gu/getpods

kubelet/dockertool: Move Getpods() to DockerManager.
pull/6/head
Dawn Chen 2015-04-13 14:50:52 -07:00
commit a0fa592b80
8 changed files with 87 additions and 86 deletions

View File

@ -562,53 +562,3 @@ func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (Doc
} }
return result, nil return result, nil
} }
// TODO: Move this function with dockerCache to DockerManager.
func GetPods(client DockerInterface, all bool) ([]*kubecontainer.Pod, error) {
pods := make(map[types.UID]*kubecontainer.Pod)
var result []*kubecontainer.Pod
containers, err := GetKubeletDockerContainers(client, all)
if err != nil {
return nil, err
}
// Group containers by pod.
for _, c := range containers {
if len(c.Names) == 0 {
glog.Warningf("Cannot parse empty docker container name: %#v", c.Names)
continue
}
dockerName, hash, err := ParseDockerName(c.Names[0])
if err != nil {
glog.Warningf("Parse docker container name %q error: %v", c.Names[0], err)
continue
}
pod, found := pods[dockerName.PodUID]
if !found {
name, namespace, err := kubecontainer.ParsePodFullName(dockerName.PodFullName)
if err != nil {
glog.Warningf("Parse pod full name %q error: %v", dockerName.PodFullName, err)
continue
}
pod = &kubecontainer.Pod{
ID: dockerName.PodUID,
Name: name,
Namespace: namespace,
}
pods[dockerName.PodUID] = pod
}
pod.Containers = append(pod.Containers, &kubecontainer.Container{
ID: types.UID(c.ID),
Name: dockerName.ContainerName,
Hash: hash,
Created: c.Created,
})
}
// Convert map to list.
for _, c := range pods {
result = append(result, c)
}
return result, nil
}

View File

@ -20,32 +20,36 @@ import (
"sync" "sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
) )
type DockerCache interface { type DockerCache interface {
GetPods() ([]*container.Pod, error) GetPods() ([]*kubecontainer.Pod, error)
ForceUpdateIfOlder(time.Time) error ForceUpdateIfOlder(time.Time) error
} }
func NewDockerCache(client DockerInterface) (DockerCache, error) { type podsGetter interface {
GetPods(bool) ([]*kubecontainer.Pod, error)
}
func NewDockerCache(getter podsGetter) (DockerCache, error) {
return &dockerCache{ return &dockerCache{
client: client, getter: getter,
updatingCache: false, updatingCache: false,
}, nil }, nil
} }
// dockerCache is a default implementation of DockerCache interface // dockerCache is a default implementation of DockerCache interface
// TODO(yifan): Use runtime cache to replace this.
type dockerCache struct { type dockerCache struct {
// The underlying docker client used to update the cache. // The narrowed interface for updating the cache.
client DockerInterface getter podsGetter
// Mutex protecting all of the following fields. // Mutex protecting all of the following fields.
lock sync.Mutex lock sync.Mutex
// Last time when cache was updated. // Last time when cache was updated.
cacheTime time.Time cacheTime time.Time
// The content of the cache. // The content of the cache.
pods []*container.Pod pods []*kubecontainer.Pod
// Whether the background thread updating the cache is running. // Whether the background thread updating the cache is running.
updatingCache bool updatingCache bool
// Time when the background thread should be stopped. // Time when the background thread should be stopped.
@ -55,11 +59,11 @@ type dockerCache struct {
// Ensure that dockerCache abides by the DockerCache interface. // Ensure that dockerCache abides by the DockerCache interface.
var _ DockerCache = new(dockerCache) var _ DockerCache = new(dockerCache)
func (d *dockerCache) GetPods() ([]*container.Pod, error) { func (d *dockerCache) GetPods() ([]*kubecontainer.Pod, error) {
d.lock.Lock() d.lock.Lock()
defer d.lock.Unlock() defer d.lock.Unlock()
if time.Since(d.cacheTime) > 2*time.Second { if time.Since(d.cacheTime) > 2*time.Second {
pods, err := GetPods(d.client, false) pods, err := d.getter.GetPods(false)
if err != nil { if err != nil {
return pods, err return pods, err
} }
@ -79,7 +83,7 @@ func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error {
d.lock.Lock() d.lock.Lock()
defer d.lock.Unlock() defer d.lock.Unlock()
if d.cacheTime.Before(minExpectedCacheTime) { if d.cacheTime.Before(minExpectedCacheTime) {
pods, err := GetPods(d.client, false) pods, err := d.getter.GetPods(false)
if err != nil { if err != nil {
return err return err
} }
@ -93,7 +97,7 @@ func (d *dockerCache) startUpdatingCache() {
run := true run := true
for run { for run {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
pods, err := GetPods(d.client, false) pods, err := d.getter.GetPods(false)
cacheTime := time.Now() cacheTime := time.Now()
if err != nil { if err != nil {
continue continue

View File

@ -333,17 +333,15 @@ func (f *FakeDockerPuller) IsImagePresent(name string) (bool, error) {
} }
type FakeDockerCache struct { type FakeDockerCache struct {
client DockerInterface getter podsGetter
} }
func NewFakeDockerCache(client DockerInterface) DockerCache { func NewFakeDockerCache(getter podsGetter) DockerCache {
return &FakeDockerCache{ return &FakeDockerCache{getter: getter}
client: client,
}
} }
func (f *FakeDockerCache) GetPods() ([]*container.Pod, error) { func (f *FakeDockerCache) GetPods() ([]*container.Pod, error) {
return GetPods(f.client, false) return f.getter.GetPods(false)
} }
func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error { func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error {

View File

@ -520,3 +520,52 @@ func makeCapabilites(capAdd []api.CapabilityType, capDrop []api.CapabilityType)
} }
return addCaps, dropCaps return addCaps, dropCaps
} }
func (self *DockerManager) GetPods(all bool) ([]*kubecontainer.Pod, error) {
pods := make(map[types.UID]*kubecontainer.Pod)
var result []*kubecontainer.Pod
containers, err := GetKubeletDockerContainers(self.client, all)
if err != nil {
return nil, err
}
// Group containers by pod.
for _, c := range containers {
if len(c.Names) == 0 {
glog.Warningf("Cannot parse empty docker container name: %#v", c.Names)
continue
}
dockerName, hash, err := ParseDockerName(c.Names[0])
if err != nil {
glog.Warningf("Parse docker container name %q error: %v", c.Names[0], err)
continue
}
pod, found := pods[dockerName.PodUID]
if !found {
name, namespace, err := kubecontainer.ParsePodFullName(dockerName.PodFullName)
if err != nil {
glog.Warningf("Parse pod full name %q error: %v", dockerName.PodFullName, err)
continue
}
pod = &kubecontainer.Pod{
ID: dockerName.PodUID,
Name: name,
Namespace: namespace,
}
pods[dockerName.PodUID] = pod
}
pod.Containers = append(pod.Containers, &kubecontainer.Container{
ID: types.UID(c.ID),
Name: dockerName.ContainerName,
Hash: hash,
Created: c.Created,
})
}
// Convert map to list.
for _, c := range pods {
result = append(result, c)
}
return result, nil
}

View File

@ -234,7 +234,7 @@ func NewMainKubelet(
klet.podManager = newBasicPodManager(klet.kubeClient) klet.podManager = newBasicPodManager(klet.kubeClient)
dockerCache, err := dockertools.NewDockerCache(dockerClient) dockerCache, err := dockertools.NewDockerCache(containerManager)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1973,9 +1973,9 @@ func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
// findContainer finds and returns the container with the given pod ID, full name, and container name. // findContainer finds and returns the container with the given pod ID, full name, and container name.
// It returns nil if not found. // It returns nil if not found.
// TODO(yifan): Move this to runtime once GetPods() has the same signature as the runtime.GetPods(). // TODO(yifan): Move this to runtime once the runtime interface has been all implemented.
func (kl *Kubelet) findContainer(podFullName string, podUID types.UID, containerName string) (*kubecontainer.Container, error) { func (kl *Kubelet) findContainer(podFullName string, podUID types.UID, containerName string) (*kubecontainer.Container, error) {
pods, err := dockertools.GetPods(kl.dockerClient, false) pods, err := kl.containerManager.GetPods(false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -2027,7 +2027,7 @@ func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16
return fmt.Errorf("no runner specified.") return fmt.Errorf("no runner specified.")
} }
pods, err := dockertools.GetPods(kl.dockerClient, false) pods, err := kl.containerManager.GetPods(false)
if err != nil { if err != nil {
return err return err
} }

View File

@ -70,13 +70,12 @@ type TestKubelet struct {
func newTestKubelet(t *testing.T) *TestKubelet { func newTestKubelet(t *testing.T) *TestKubelet {
fakeDocker := &dockertools.FakeDockerClient{Errors: make(map[string]error), RemovedImages: util.StringSet{}} fakeDocker := &dockertools.FakeDockerClient{Errors: make(map[string]error), RemovedImages: util.StringSet{}}
fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker)
fakeRecorder := &record.FakeRecorder{} fakeRecorder := &record.FakeRecorder{}
fakeKubeClient := &testclient.Fake{} fakeKubeClient := &testclient.Fake{}
kubelet := &Kubelet{} kubelet := &Kubelet{}
kubelet.dockerClient = fakeDocker kubelet.dockerClient = fakeDocker
kubelet.dockerCache = fakeDockerCache
kubelet.kubeClient = fakeKubeClient kubelet.kubeClient = fakeKubeClient
kubelet.dockerPuller = &dockertools.FakeDockerPuller{} kubelet.dockerPuller = &dockertools.FakeDockerPuller{}
kubelet.hostname = "testnode" kubelet.hostname = "testnode"
@ -90,14 +89,6 @@ func newTestKubelet(t *testing.T) *TestKubelet {
t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err) t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err)
} }
waitGroup := new(sync.WaitGroup) waitGroup := new(sync.WaitGroup)
kubelet.podWorkers = newPodWorkers(
fakeDockerCache,
func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error {
err := kubelet.syncPod(pod, mirrorPod, runningPod)
waitGroup.Done()
return err
},
fakeRecorder)
kubelet.sourcesReady = func() bool { return true } kubelet.sourcesReady = func() bool { return true }
kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{} kubelet.serviceLister = testServiceLister{}
@ -114,6 +105,15 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.podManager = podManager kubelet.podManager = podManager
kubelet.containerRefManager = kubecontainer.NewRefManager() kubelet.containerRefManager = kubecontainer.NewRefManager()
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage) kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage)
kubelet.dockerCache = dockertools.NewFakeDockerCache(kubelet.containerManager)
kubelet.podWorkers = newPodWorkers(
kubelet.dockerCache,
func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error {
err := kubelet.syncPod(pod, mirrorPod, runningPod)
waitGroup.Done()
return err
},
fakeRecorder)
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient} return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
} }

View File

@ -39,8 +39,8 @@ func newPod(uid, name string) *api.Pod {
func createPodWorkers() (*podWorkers, map[types.UID][]string) { func createPodWorkers() (*podWorkers, map[types.UID][]string) {
fakeDocker := &dockertools.FakeDockerClient{} fakeDocker := &dockertools.FakeDockerClient{}
fakeDockerCache := dockertools.NewFakeDockerCache(fakeDocker) fakeRecorder := &record.FakeRecorder{}
recorder := &record.FakeRecorder{} fakeDockerCache := dockertools.NewFakeDockerCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage))
lock := sync.Mutex{} lock := sync.Mutex{}
processed := make(map[types.UID][]string) processed := make(map[types.UID][]string)
@ -55,7 +55,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
}() }()
return nil return nil
}, },
recorder, fakeRecorder,
) )
return podWorkers, processed return podWorkers, processed
} }

View File

@ -92,7 +92,7 @@ func (kl *Kubelet) runPod(pod api.Pod, retryDelay time.Duration) error {
delay := retryDelay delay := retryDelay
retry := 0 retry := 0
for { for {
pods, err := dockertools.GetPods(kl.dockerClient, false) pods, err := kl.containerManager.GetPods(false)
if err != nil { if err != nil {
return fmt.Errorf("failed to get kubelet pods: %v", err) return fmt.Errorf("failed to get kubelet pods: %v", err)
} }