diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index f35cfd67ad..0fa6e369e0 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -229,6 +229,8 @@ func NewMainKubelet( imageManager: imageManager, } + klet.podManager = newBasicPodManager(klet.kubeClient) + dockerCache, err := dockertools.NewDockerCache(dockerClient) if err != nil { return nil, err @@ -253,8 +255,6 @@ func NewMainKubelet( klet.podStatuses = make(map[string]api.PodStatus) - klet.mirrorManager = newBasicMirrorManager(klet.kubeClient) - return klet, nil } @@ -285,20 +285,10 @@ type Kubelet struct { podStatusUpdateFrequency time.Duration sourcesReady SourcesReadyFn - // Protects the pods array - // We make complete array copies out of this while locked, which is OK because once added to this array, - // pods are immutable - podLock sync.RWMutex - pods []api.Pod - // Record the set of mirror pods (see mirror_manager.go for more details); - // similar to pods, this is not immutable and is protected by the same podLock. - // Note that Kubelet.pods do not contain mirror pods as they are filtered - // out beforehand. - mirrorPods mirrorPods - + podManager podManager // A pod status cache stores statuses for pods (both rejected and synced). // Note that currently no thread attempts to acquire podStatusesLock while - // holding podLock, and vice versa. If you intend to change this usage + // accessing podManager, and vice versa. If you intend to change this usage // pattern, please explicitly impose an acquiring order to avoid deadlocks // and document such an order in the comment. podStatusesLock sync.RWMutex @@ -353,9 +343,6 @@ type Kubelet struct { // the EventRecorder to use recorder record.EventRecorder - // A mirror pod manager which provides helper functions. - mirrorManager mirrorManager - // Policy for handling garbage collection of dead containers. containerGC containerGC @@ -1445,7 +1432,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock if !hasMirrorPod && isStaticPod(pod) { glog.V(4).Infof("Creating a mirror pod %q", podFullName) - if err := kl.mirrorManager.CreateMirrorPod(*pod, kl.hostname); err != nil { + if err := kl.podManager.CreateMirrorPod(*pod, kl.hostname); err != nil { glog.Errorf("Failed creating a mirror pod %q: %#v", podFullName, err) } } @@ -1572,7 +1559,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric } // Run the sync in an async manifest worker. - kl.podWorkers.UpdatePod(pod, kl.mirrorPods.HasMirrorPod(uid), func() { + kl.podWorkers.UpdatePod(pod, mirrorPods.HasMirrorPod(uid), func() { metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start)) }) @@ -1641,33 +1628,11 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric } // Remove any orphaned mirror pods. - deleteOrphanedMirrorPods(mirrorPods, kl.mirrorManager) + kl.podManager.DeleteOrphanedMirrorPods(&mirrorPods) return err } -func updatePods(changed []api.Pod, current []api.Pod) []api.Pod { - updated := []api.Pod{} - m := map[types.UID]*api.Pod{} - for i := range changed { - pod := &changed[i] - m[pod.UID] = pod - } - - for i := range current { - pod := ¤t[i] - if m[pod.UID] != nil { - updated = append(updated, *m[pod.UID]) - glog.V(4).Infof("pod with UID: %q has a new spec %+v", pod.UID, *m[pod.UID]) - } else { - updated = append(updated, *pod) - glog.V(4).Infof("pod with UID: %q stay with the same spec %+v", pod.UID, *pod) - } - } - - return updated -} - type podsByCreationTime []api.Pod func (s podsByCreationTime) Len() int { @@ -1771,7 +1736,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { podSyncTypes := make(map[types.UID]metrics.SyncPodType) select { case u := <-updates: - kl.updatePods(u, podSyncTypes) + kl.podManager.UpdatePods(u, podSyncTypes) unsyncedPod = true case <-time.After(kl.resyncInterval): glog.V(4).Infof("Periodic sync") @@ -1782,7 +1747,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { for unsyncedPod { select { case u := <-updates: - kl.updatePods(u, podSyncTypes) + kl.podManager.UpdatePods(u, podSyncTypes) case <-time.After(5 * time.Millisecond): // Break the for loop. unsyncedPod = false @@ -1830,52 +1795,6 @@ func (kl *Kubelet) syncStatus(deadline time.Duration) { t.Stop() } -// Update the Kubelet's internal pods with those provided by the update. -// Records new and updated pods in newPods and updatedPods. -func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) { - kl.podLock.Lock() - defer kl.podLock.Unlock() - switch u.Op { - case SET: - glog.V(3).Infof("SET: Containers changed") - newPods, newMirrorPods := filterAndCategorizePods(u.Pods) - - // Store the new pods. Don't worry about filtering host ports since those - // pods will never be looked up. - existingPods := make(map[types.UID]struct{}) - for i := range kl.pods { - existingPods[kl.pods[i].UID] = struct{}{} - } - for _, pod := range newPods { - if _, ok := existingPods[pod.UID]; !ok { - podSyncTypes[pod.UID] = metrics.SyncPodCreate - } - } - // Actually update the pods. - kl.pods = newPods - kl.mirrorPods = newMirrorPods - case UPDATE: - glog.V(3).Infof("Update: Containers changed") - - // Store the updated pods. Don't worry about filtering host ports since those - // pods will never be looked up. - for i := range u.Pods { - podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate - } - allPods := updatePods(u.Pods, kl.pods) - kl.pods, kl.mirrorPods = filterAndCategorizePods(allPods) - default: - panic("syncLoop does not support incremental changes") - } - - // Mark all remaining pods as sync. - for i := range kl.pods { - if _, ok := podSyncTypes[kl.pods[i].UID]; !ok { - podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync - } - } -} - // Returns Docker version for this Kubelet. func (kl *Kubelet) GetDockerVersion() ([]uint, error) { if kl.dockerClient == nil { @@ -1937,31 +1856,17 @@ func (kl *Kubelet) GetHostname() string { // GetPods returns all pods bound to the kubelet and their spec, and the mirror // pod map. func (kl *Kubelet) GetPods() ([]api.Pod, mirrorPods) { - kl.podLock.RLock() - defer kl.podLock.RUnlock() - return append([]api.Pod{}, kl.pods...), kl.mirrorPods + return kl.podManager.GetPods() } func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) { - name, namespace, err := ParsePodFullName(podFullName) - if err != nil { - return nil, false - } - return kl.GetPodByName(namespace, name) + return kl.podManager.GetPodByFullName(podFullName) } // GetPodByName provides the first pod that matches namespace and name, as well // as whether the pod was found. func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) { - kl.podLock.RLock() - defer kl.podLock.RUnlock() - for i := range kl.pods { - pod := kl.pods[i] - if pod.Namespace == namespace && pod.Name == name { - return &pod, true - } - } - return nil, false + return kl.podManager.GetPodByName(namespace, name) } // updateNodeStatus updates node status to master with retries. @@ -2108,7 +2013,7 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio // GetPodStatus returns information from Docker about the containers in a pod func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { - uid = kl.translatePodUID(uid) + uid = kl.podManager.TranslatePodUID(uid) // Check to see if we have a cached version of the status. cachedPodStatus, found := kl.getPodStatusFromCache(podFullName) @@ -2172,7 +2077,7 @@ func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) { // Run a command in a container, returns the combined stdout, stderr as an array of bytes func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container string, cmd []string) ([]byte, error) { - uid = kl.translatePodUID(uid) + uid = kl.podManager.TranslatePodUID(uid) if kl.runner == nil { return nil, fmt.Errorf("no runner specified.") @@ -2191,7 +2096,7 @@ func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container s // ExecInContainer executes a command in a container, connecting the supplied // stdin/stdout/stderr to the command's IO streams. func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { - uid = kl.translatePodUID(uid) + uid = kl.podManager.TranslatePodUID(uid) if kl.runner == nil { return fmt.Errorf("no runner specified.") @@ -2210,7 +2115,7 @@ func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container // PortForward connects to the pod's port and copies data between the port // and the stream. func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { - uid = kl.translatePodUID(uid) + uid = kl.podManager.TranslatePodUID(uid) if kl.runner == nil { return fmt.Errorf("no runner specified.") @@ -2244,29 +2149,10 @@ func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration { return kl.streamingConnectionIdleTimeout } -// If the UID belongs to a mirror pod, maps it to the UID of its static pod. -// Otherwise, return the original UID. All public-facing functions should -// perform this translation for UIDs because user may provide a mirror pod UID, -// which is not recognized by internal Kubelet functions. -func (kl *Kubelet) translatePodUID(uid types.UID) types.UID { - if uid == "" { - return uid - } - - kl.podLock.RLock() - defer kl.podLock.RUnlock() - staticUID, ok := kl.mirrorPods.GetStaticUID(uid) - if ok { - return staticUID - } else { - return uid - } -} - // GetContainerInfo returns stats (from Cadvisor) for a container. func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) { - uid = kl.translatePodUID(uid) + uid = kl.podManager.TranslatePodUID(uid) dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false) if err != nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 1e179f474c..6df4547d82 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -105,9 +105,9 @@ func newTestKubelet(t *testing.T) *TestKubelet { } mockCadvisor := &cadvisor.Mock{} kubelet.cadvisor = mockCadvisor - mirrorManager := newFakeMirrorMananger() - kubelet.mirrorManager = mirrorManager - return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, mirrorManager} + podManager, fakeMirrorManager := newFakePodManager() + kubelet.podManager = podManager + return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorManager} } func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) { @@ -434,7 +434,7 @@ func TestSyncPodsDoesNothing(t *testing.T) { ID: "9876", }, } - kubelet.pods = []api.Pod{ + pods := []api.Pod{ { ObjectMeta: api.ObjectMeta{ UID: "12345678", @@ -448,8 +448,9 @@ func TestSyncPodsDoesNothing(t *testing.T) { }, }, } + kubelet.podManager.SetPods(pods) waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -468,7 +469,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { TerminationMessagePath: "/dev/somepath", } fakeDocker.ContainerList = []docker.APIContainers{} - kubelet.pods = []api.Pod{ + pods := []api.Pod{ { ObjectMeta: api.ObjectMeta{ UID: "12345678", @@ -482,8 +483,9 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { }, }, } + kubelet.podManager.SetPods(pods) waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -518,7 +520,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { waitGroup := testKubelet.waitGroup kubelet.podInfraContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} - kubelet.pods = []api.Pod{ + pods := []api.Pod{ { ObjectMeta: api.ObjectMeta{ UID: "12345678", @@ -532,8 +534,9 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { }, }, } + kubelet.podManager.SetPods(pods) waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -572,7 +575,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { puller.HasImages = []string{} kubelet.podInfraContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} - kubelet.pods = []api.Pod{ + pods := []api.Pod{ { ObjectMeta: api.ObjectMeta{ UID: "12345678", @@ -587,7 +590,8 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + kubelet.podManager.SetPods(pods) + err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -623,7 +627,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { ID: "9876", }, } - kubelet.pods = []api.Pod{ + pods := []api.Pod{ { ObjectMeta: api.ObjectMeta{ UID: "12345678", @@ -638,7 +642,8 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + kubelet.podManager.SetPods(pods) + err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -670,7 +675,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { ID: "9876", }, } - kubelet.pods = []api.Pod{ + pods := []api.Pod{ { ObjectMeta: api.ObjectMeta{ UID: "12345678", @@ -696,7 +701,8 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + kubelet.podManager.SetPods(pods) + err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -739,7 +745,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { ID: "8765", }, } - kubelet.pods = []api.Pod{ + pods := []api.Pod{ { ObjectMeta: api.ObjectMeta{ UID: "12345678", @@ -766,7 +772,8 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { }, } waitGroup.Add(2) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) + kubelet.podManager.SetPods(pods) + err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -907,7 +914,8 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { }, }, } - kubelet.pods = append(kubelet.pods, bound) + pods := []api.Pod{bound} + kubelet.podManager.SetPods(pods) err := kubelet.syncPod(&bound, false, dockerContainers) if err != nil { t.Errorf("unexpected error: %v", err) @@ -948,7 +956,8 @@ func TestSyncPodBadHash(t *testing.T) { }, }, } - kubelet.pods = append(kubelet.pods, bound) + pods := []api.Pod{bound} + kubelet.podManager.SetPods(pods) err := kubelet.syncPod(&bound, false, dockerContainers) if err != nil { t.Errorf("unexpected error: %v", err) @@ -1002,7 +1011,8 @@ func TestSyncPodUnhealthy(t *testing.T) { }, }, } - kubelet.pods = append(kubelet.pods, bound) + pods := []api.Pod{bound} + kubelet.podManager.SetPods(pods) err := kubelet.syncPod(&bound, false, dockerContainers) if err != nil { t.Errorf("unexpected error: %v", err) @@ -1692,7 +1702,8 @@ func TestSyncPodEventHandlerFails(t *testing.T) { }, }, } - kubelet.pods = append(kubelet.pods, bound) + pods := []api.Pod{bound} + kubelet.podManager.SetPods(pods) err := kubelet.syncPod(&bound, false, dockerContainers) if err != nil { t.Errorf("unexpected error: %v", err) @@ -2868,7 +2879,7 @@ func TestHandlePortConflicts(t *testing.T) { } // Check if we can retrieve the pod status from GetPodStatus(). - kl.pods = pods + kl.podManager.SetPods(pods) status, err := kl.GetPodStatus(conflictedPodName, "") if err != nil { t.Fatalf("unable to retrieve pod status for pod %q: #v.", conflictedPodName, err) @@ -2921,7 +2932,7 @@ func TestHandleNodeSelector(t *testing.T) { } // Check if we can retrieve the pod status from GetPodStatus(). - kl.pods = pods + kl.podManager.SetPods(pods) status, err := kl.GetPodStatus(notfittingPodName, "") if err != nil { t.Fatalf("unable to retrieve pod status for pod %q: #v.", notfittingPodName, err) @@ -2980,7 +2991,7 @@ func TestHandleMemExceeded(t *testing.T) { } // Check if we can retrieve the pod status from GetPodStatus(). - kl.pods = pods + kl.podManager.SetPods(pods) status, err := kl.GetPodStatus(notfittingPodName, "") if err != nil { t.Fatalf("unable to retrieve pod status for pod %q: #v.", notfittingPodName, err) @@ -3232,7 +3243,8 @@ func TestCreateMirrorPod(t *testing.T) { }, }, } - kl.pods = append(kl.pods, pod) + pods := []api.Pod{pod} + kl.podManager.SetPods(pods) hasMirrorPod := false err := kl.syncPod(&pod, hasMirrorPod, dockertools.DockerContainers{}) if err != nil { @@ -3357,7 +3369,7 @@ func TestGetContainerInfoForMirrorPods(t *testing.T) { }, } - kubelet.pods, kubelet.mirrorPods = filterAndCategorizePods(pods) + kubelet.podManager.SetPods(pods) // Use the mirror pod UID to retrieve the stats. stats, err := kubelet.GetContainerInfo("qux_ns", "5678", "foo", cadvisorReq) if err != nil { diff --git a/pkg/kubelet/mirror_manager.go b/pkg/kubelet/mirror_manager.go index 1d1ab68dc7..9ab68b0392 100644 --- a/pkg/kubelet/mirror_manager.go +++ b/pkg/kubelet/mirror_manager.go @@ -85,14 +85,6 @@ func (self *basicMirrorManager) DeleteMirrorPod(podFullName string) error { return nil } -// Delete all orphaned mirror pods. -func deleteOrphanedMirrorPods(mirrorPods mirrorPods, manager mirrorManager) { - podFullNames := mirrorPods.GetOrphanedMirrorPodNames() - for _, podFullName := range podFullNames { - manager.DeleteMirrorPod(podFullName) - } -} - // Helper functions. func getPodSource(pod *api.Pod) (string, error) { if pod.Annotations != nil { diff --git a/pkg/kubelet/pod_manager.go b/pkg/kubelet/pod_manager.go new file mode 100644 index 0000000000..f1365277ce --- /dev/null +++ b/pkg/kubelet/pod_manager.go @@ -0,0 +1,199 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/golang/glog" +) + +type podManager interface { + UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) + GetPods() ([]api.Pod, mirrorPods) + GetPodByName(namespace, name string) (*api.Pod, bool) + GetPodByFullName(podFullName string) (*api.Pod, bool) + TranslatePodUID(uid types.UID) types.UID + DeleteOrphanedMirrorPods(mirrorPods *mirrorPods) + SetPods(pods []api.Pod) + mirrorManager +} + +type basicPodManager struct { + // Protects all internal pod storage/mappings. + lock sync.RWMutex + pods []api.Pod + // Record the set of mirror pods (see mirror_manager.go for more details); + // similar to pods, this is not immutable and is protected by the same podLock. + // Note that basicPodManager.pods do not contain mirror pods as they are + // filtered out beforehand. + mirrorPods mirrorPods + + // A mirror pod manager which provides helper functions. + mirrorManager mirrorManager +} + +func newBasicPodManager(apiserverClient client.Interface) *basicPodManager { + podManager := &basicPodManager{} + podManager.mirrorManager = newBasicMirrorManager(apiserverClient) + podManager.mirrorPods = *newMirrorPods() + podManager.pods = []api.Pod{} + return podManager +} + +// This method is used only for testing to quickly set the internal pods. +func (self *basicPodManager) SetPods(pods []api.Pod) { + self.pods, self.mirrorPods = filterAndCategorizePods(pods) +} + +// Update the internal pods with those provided by the update. +// Records new and updated pods in newPods and updatedPods. +func (self *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) { + self.lock.Lock() + defer self.lock.Unlock() + switch u.Op { + case SET: + glog.V(3).Infof("SET: Containers changed") + newPods, newMirrorPods := filterAndCategorizePods(u.Pods) + + // Store the new pods. Don't worry about filtering host ports since those + // pods will never be looked up. + existingPods := make(map[types.UID]struct{}) + for i := range self.pods { + existingPods[self.pods[i].UID] = struct{}{} + } + for _, pod := range newPods { + if _, ok := existingPods[pod.UID]; !ok { + podSyncTypes[pod.UID] = metrics.SyncPodCreate + } + } + // Actually update the pods. + self.pods = newPods + self.mirrorPods = newMirrorPods + case UPDATE: + glog.V(3).Infof("Update: Containers changed") + + // Store the updated pods. Don't worry about filtering host ports since those + // pods will never be looked up. + for i := range u.Pods { + podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate + } + allPods := updatePods(u.Pods, self.pods) + self.pods, self.mirrorPods = filterAndCategorizePods(allPods) + default: + panic("syncLoop does not support incremental changes") + } + + // Mark all remaining pods as sync. + for i := range self.pods { + if _, ok := podSyncTypes[self.pods[i].UID]; !ok { + podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync + } + } +} + +func updatePods(changed []api.Pod, current []api.Pod) []api.Pod { + updated := []api.Pod{} + m := map[types.UID]*api.Pod{} + for i := range changed { + pod := &changed[i] + m[pod.UID] = pod + } + + for i := range current { + pod := ¤t[i] + if m[pod.UID] != nil { + updated = append(updated, *m[pod.UID]) + glog.V(4).Infof("pod with UID: %q has a new spec %+v", pod.UID, *m[pod.UID]) + } else { + updated = append(updated, *pod) + glog.V(4).Infof("pod with UID: %q stay with the same spec %+v", pod.UID, *pod) + } + } + + return updated +} + +// GetPods returns all pods bound to the kubelet and their spec, and the mirror +// pod map. +func (self *basicPodManager) GetPods() ([]api.Pod, mirrorPods) { + self.lock.RLock() + defer self.lock.RUnlock() + return append([]api.Pod{}, self.pods...), self.mirrorPods +} + +// GetPodByName provides the first pod that matches namespace and name, as well +// as whether the pod was found. +func (self *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) { + self.lock.RLock() + defer self.lock.RUnlock() + for i := range self.pods { + pod := self.pods[i] + if pod.Namespace == namespace && pod.Name == name { + return &pod, true + } + } + return nil, false +} + +func (self *basicPodManager) GetPodByFullName(podFullName string) (*api.Pod, bool) { + name, namespace, err := ParsePodFullName(podFullName) + if err != nil { + return nil, false + } + return self.GetPodByName(namespace, name) +} + +// If the UID belongs to a mirror pod, maps it to the UID of its static pod. +// Otherwise, return the original UID. All public-facing functions should +// perform this translation for UIDs because user may provide a mirror pod UID, +// which is not recognized by internal Kubelet functions. +func (self *basicPodManager) TranslatePodUID(uid types.UID) types.UID { + if uid == "" { + return uid + } + + self.lock.RLock() + defer self.lock.RUnlock() + staticUID, ok := self.mirrorPods.GetStaticUID(uid) + if ok { + return staticUID + } else { + return uid + } +} + +// Delete all orphaned mirror pods. This method doesn't acquire the lock +// because it assumes the a copy of the mirrorPod is passed as an argument. +func (self *basicPodManager) DeleteOrphanedMirrorPods(mirrorPods *mirrorPods) { + podFullNames := mirrorPods.GetOrphanedMirrorPodNames() + for _, podFullName := range podFullNames { + self.mirrorManager.DeleteMirrorPod(podFullName) + } +} + +func (self *basicPodManager) CreateMirrorPod(pod api.Pod, hostname string) error { + return self.mirrorManager.CreateMirrorPod(pod, hostname) +} + +func (self *basicPodManager) DeleteMirrorPod(podFullName string) error { + return self.mirrorManager.DeleteMirrorPod(podFullName) +} diff --git a/pkg/kubelet/pod_manager_test.go b/pkg/kubelet/pod_manager_test.go new file mode 100644 index 0000000000..2ca65901bd --- /dev/null +++ b/pkg/kubelet/pod_manager_test.go @@ -0,0 +1,25 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +// Stub out mirror manager for testing purpose. +func newFakePodManager() (*basicPodManager, *fakeMirrorManager) { + podManager := newBasicPodManager(nil) + fakeMirrorManager := newFakeMirrorMananger() + podManager.mirrorManager = fakeMirrorManager + return podManager, fakeMirrorManager +}