Kubelet: add podManager for managing internal pod storage

This change moves pod array and mirrorPods into podManager, along with all
methods accessing these internal pod storages. This is the first step of the
refactoring, and no function change is involved.
pull/6/head
Yu-Ju Hong 2015-03-20 17:22:02 -07:00
parent 737af02fc8
commit f440989017
5 changed files with 279 additions and 165 deletions

View File

@ -229,6 +229,8 @@ func NewMainKubelet(
imageManager: imageManager,
}
klet.podManager = newBasicPodManager(klet.kubeClient)
dockerCache, err := dockertools.NewDockerCache(dockerClient)
if err != nil {
return nil, err
@ -253,8 +255,6 @@ func NewMainKubelet(
klet.podStatuses = make(map[string]api.PodStatus)
klet.mirrorManager = newBasicMirrorManager(klet.kubeClient)
return klet, nil
}
@ -285,20 +285,10 @@ type Kubelet struct {
podStatusUpdateFrequency time.Duration
sourcesReady SourcesReadyFn
// Protects the pods array
// We make complete array copies out of this while locked, which is OK because once added to this array,
// pods are immutable
podLock sync.RWMutex
pods []api.Pod
// Record the set of mirror pods (see mirror_manager.go for more details);
// similar to pods, this is not immutable and is protected by the same podLock.
// Note that Kubelet.pods do not contain mirror pods as they are filtered
// out beforehand.
mirrorPods mirrorPods
podManager podManager
// A pod status cache stores statuses for pods (both rejected and synced).
// Note that currently no thread attempts to acquire podStatusesLock while
// holding podLock, and vice versa. If you intend to change this usage
// accessing podManager, and vice versa. If you intend to change this usage
// pattern, please explicitly impose an acquiring order to avoid deadlocks
// and document such an order in the comment.
podStatusesLock sync.RWMutex
@ -353,9 +343,6 @@ type Kubelet struct {
// the EventRecorder to use
recorder record.EventRecorder
// A mirror pod manager which provides helper functions.
mirrorManager mirrorManager
// Policy for handling garbage collection of dead containers.
containerGC containerGC
@ -1445,7 +1432,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dock
if !hasMirrorPod && isStaticPod(pod) {
glog.V(4).Infof("Creating a mirror pod %q", podFullName)
if err := kl.mirrorManager.CreateMirrorPod(*pod, kl.hostname); err != nil {
if err := kl.podManager.CreateMirrorPod(*pod, kl.hostname); err != nil {
glog.Errorf("Failed creating a mirror pod %q: %#v", podFullName, err)
}
}
@ -1572,7 +1559,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}
// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.HasMirrorPod(uid), func() {
kl.podWorkers.UpdatePod(pod, mirrorPods.HasMirrorPod(uid), func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
@ -1641,33 +1628,11 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}
// Remove any orphaned mirror pods.
deleteOrphanedMirrorPods(mirrorPods, kl.mirrorManager)
kl.podManager.DeleteOrphanedMirrorPods(&mirrorPods)
return err
}
func updatePods(changed []api.Pod, current []api.Pod) []api.Pod {
updated := []api.Pod{}
m := map[types.UID]*api.Pod{}
for i := range changed {
pod := &changed[i]
m[pod.UID] = pod
}
for i := range current {
pod := &current[i]
if m[pod.UID] != nil {
updated = append(updated, *m[pod.UID])
glog.V(4).Infof("pod with UID: %q has a new spec %+v", pod.UID, *m[pod.UID])
} else {
updated = append(updated, *pod)
glog.V(4).Infof("pod with UID: %q stay with the same spec %+v", pod.UID, *pod)
}
}
return updated
}
type podsByCreationTime []api.Pod
func (s podsByCreationTime) Len() int {
@ -1771,7 +1736,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
podSyncTypes := make(map[types.UID]metrics.SyncPodType)
select {
case u := <-updates:
kl.updatePods(u, podSyncTypes)
kl.podManager.UpdatePods(u, podSyncTypes)
unsyncedPod = true
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
@ -1782,7 +1747,7 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
for unsyncedPod {
select {
case u := <-updates:
kl.updatePods(u, podSyncTypes)
kl.podManager.UpdatePods(u, podSyncTypes)
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
@ -1830,52 +1795,6 @@ func (kl *Kubelet) syncStatus(deadline time.Duration) {
t.Stop()
}
// Update the Kubelet's internal pods with those provided by the update.
// Records new and updated pods in newPods and updatedPods.
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
kl.podLock.Lock()
defer kl.podLock.Unlock()
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
newPods, newMirrorPods := filterAndCategorizePods(u.Pods)
// Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up.
existingPods := make(map[types.UID]struct{})
for i := range kl.pods {
existingPods[kl.pods[i].UID] = struct{}{}
}
for _, pod := range newPods {
if _, ok := existingPods[pod.UID]; !ok {
podSyncTypes[pod.UID] = metrics.SyncPodCreate
}
}
// Actually update the pods.
kl.pods = newPods
kl.mirrorPods = newMirrorPods
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
// Store the updated pods. Don't worry about filtering host ports since those
// pods will never be looked up.
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}
allPods := updatePods(u.Pods, kl.pods)
kl.pods, kl.mirrorPods = filterAndCategorizePods(allPods)
default:
panic("syncLoop does not support incremental changes")
}
// Mark all remaining pods as sync.
for i := range kl.pods {
if _, ok := podSyncTypes[kl.pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync
}
}
}
// Returns Docker version for this Kubelet.
func (kl *Kubelet) GetDockerVersion() ([]uint, error) {
if kl.dockerClient == nil {
@ -1937,31 +1856,17 @@ func (kl *Kubelet) GetHostname() string {
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pod map.
func (kl *Kubelet) GetPods() ([]api.Pod, mirrorPods) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
return append([]api.Pod{}, kl.pods...), kl.mirrorPods
return kl.podManager.GetPods()
}
func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) {
name, namespace, err := ParsePodFullName(podFullName)
if err != nil {
return nil, false
}
return kl.GetPodByName(namespace, name)
return kl.podManager.GetPodByFullName(podFullName)
}
// GetPodByName provides the first pod that matches namespace and name, as well
// as whether the pod was found.
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
for i := range kl.pods {
pod := kl.pods[i]
if pod.Namespace == namespace && pod.Name == name {
return &pod, true
}
}
return nil, false
return kl.podManager.GetPodByName(namespace, name)
}
// updateNodeStatus updates node status to master with retries.
@ -2108,7 +2013,7 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio
// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)
// Check to see if we have a cached version of the status.
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
@ -2172,7 +2077,7 @@ func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
// Run a command in a container, returns the combined stdout, stderr as an array of bytes
func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container string, cmd []string) ([]byte, error) {
uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)
if kl.runner == nil {
return nil, fmt.Errorf("no runner specified.")
@ -2191,7 +2096,7 @@ func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container s
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)
if kl.runner == nil {
return fmt.Errorf("no runner specified.")
@ -2210,7 +2115,7 @@ func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)
if kl.runner == nil {
return fmt.Errorf("no runner specified.")
@ -2244,29 +2149,10 @@ func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}
// If the UID belongs to a mirror pod, maps it to the UID of its static pod.
// Otherwise, return the original UID. All public-facing functions should
// perform this translation for UIDs because user may provide a mirror pod UID,
// which is not recognized by internal Kubelet functions.
func (kl *Kubelet) translatePodUID(uid types.UID) types.UID {
if uid == "" {
return uid
}
kl.podLock.RLock()
defer kl.podLock.RUnlock()
staticUID, ok := kl.mirrorPods.GetStaticUID(uid)
if ok {
return staticUID
} else {
return uid
}
}
// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
uid = kl.translatePodUID(uid)
uid = kl.podManager.TranslatePodUID(uid)
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {

View File

@ -105,9 +105,9 @@ func newTestKubelet(t *testing.T) *TestKubelet {
}
mockCadvisor := &cadvisor.Mock{}
kubelet.cadvisor = mockCadvisor
mirrorManager := newFakeMirrorMananger()
kubelet.mirrorManager = mirrorManager
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, mirrorManager}
podManager, fakeMirrorManager := newFakePodManager()
kubelet.podManager = podManager
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorManager}
}
func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) {
@ -434,7 +434,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
ID: "9876",
},
}
kubelet.pods = []api.Pod{
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
@ -448,8 +448,9 @@ func TestSyncPodsDoesNothing(t *testing.T) {
},
},
}
kubelet.podManager.SetPods(pods)
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -468,7 +469,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
TerminationMessagePath: "/dev/somepath",
}
fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.pods = []api.Pod{
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
@ -482,8 +483,9 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
},
},
}
kubelet.podManager.SetPods(pods)
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -518,7 +520,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
waitGroup := testKubelet.waitGroup
kubelet.podInfraContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.pods = []api.Pod{
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
@ -532,8 +534,9 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
},
},
}
kubelet.podManager.SetPods(pods)
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -572,7 +575,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
puller.HasImages = []string{}
kubelet.podInfraContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{}
kubelet.pods = []api.Pod{
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
@ -587,7 +590,8 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
kubelet.podManager.SetPods(pods)
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -623,7 +627,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
ID: "9876",
},
}
kubelet.pods = []api.Pod{
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
@ -638,7 +642,8 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
kubelet.podManager.SetPods(pods)
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -670,7 +675,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
ID: "9876",
},
}
kubelet.pods = []api.Pod{
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
@ -696,7 +701,8 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
kubelet.podManager.SetPods(pods)
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -739,7 +745,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
ID: "8765",
},
}
kubelet.pods = []api.Pod{
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
@ -766,7 +772,8 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
},
}
waitGroup.Add(2)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
kubelet.podManager.SetPods(pods)
err := kubelet.SyncPods(pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -907,7 +914,8 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
},
},
}
kubelet.pods = append(kubelet.pods, bound)
pods := []api.Pod{bound}
kubelet.podManager.SetPods(pods)
err := kubelet.syncPod(&bound, false, dockerContainers)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -948,7 +956,8 @@ func TestSyncPodBadHash(t *testing.T) {
},
},
}
kubelet.pods = append(kubelet.pods, bound)
pods := []api.Pod{bound}
kubelet.podManager.SetPods(pods)
err := kubelet.syncPod(&bound, false, dockerContainers)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -1002,7 +1011,8 @@ func TestSyncPodUnhealthy(t *testing.T) {
},
},
}
kubelet.pods = append(kubelet.pods, bound)
pods := []api.Pod{bound}
kubelet.podManager.SetPods(pods)
err := kubelet.syncPod(&bound, false, dockerContainers)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -1692,7 +1702,8 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
},
},
}
kubelet.pods = append(kubelet.pods, bound)
pods := []api.Pod{bound}
kubelet.podManager.SetPods(pods)
err := kubelet.syncPod(&bound, false, dockerContainers)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -2868,7 +2879,7 @@ func TestHandlePortConflicts(t *testing.T) {
}
// Check if we can retrieve the pod status from GetPodStatus().
kl.pods = pods
kl.podManager.SetPods(pods)
status, err := kl.GetPodStatus(conflictedPodName, "")
if err != nil {
t.Fatalf("unable to retrieve pod status for pod %q: #v.", conflictedPodName, err)
@ -2921,7 +2932,7 @@ func TestHandleNodeSelector(t *testing.T) {
}
// Check if we can retrieve the pod status from GetPodStatus().
kl.pods = pods
kl.podManager.SetPods(pods)
status, err := kl.GetPodStatus(notfittingPodName, "")
if err != nil {
t.Fatalf("unable to retrieve pod status for pod %q: #v.", notfittingPodName, err)
@ -2980,7 +2991,7 @@ func TestHandleMemExceeded(t *testing.T) {
}
// Check if we can retrieve the pod status from GetPodStatus().
kl.pods = pods
kl.podManager.SetPods(pods)
status, err := kl.GetPodStatus(notfittingPodName, "")
if err != nil {
t.Fatalf("unable to retrieve pod status for pod %q: #v.", notfittingPodName, err)
@ -3232,7 +3243,8 @@ func TestCreateMirrorPod(t *testing.T) {
},
},
}
kl.pods = append(kl.pods, pod)
pods := []api.Pod{pod}
kl.podManager.SetPods(pods)
hasMirrorPod := false
err := kl.syncPod(&pod, hasMirrorPod, dockertools.DockerContainers{})
if err != nil {
@ -3357,7 +3369,7 @@ func TestGetContainerInfoForMirrorPods(t *testing.T) {
},
}
kubelet.pods, kubelet.mirrorPods = filterAndCategorizePods(pods)
kubelet.podManager.SetPods(pods)
// Use the mirror pod UID to retrieve the stats.
stats, err := kubelet.GetContainerInfo("qux_ns", "5678", "foo", cadvisorReq)
if err != nil {

View File

@ -85,14 +85,6 @@ func (self *basicMirrorManager) DeleteMirrorPod(podFullName string) error {
return nil
}
// Delete all orphaned mirror pods.
func deleteOrphanedMirrorPods(mirrorPods mirrorPods, manager mirrorManager) {
podFullNames := mirrorPods.GetOrphanedMirrorPodNames()
for _, podFullName := range podFullNames {
manager.DeleteMirrorPod(podFullName)
}
}
// Helper functions.
func getPodSource(pod *api.Pod) (string, error) {
if pod.Annotations != nil {

199
pkg/kubelet/pod_manager.go Normal file
View File

@ -0,0 +1,199 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/golang/glog"
)
type podManager interface {
UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType)
GetPods() ([]api.Pod, mirrorPods)
GetPodByName(namespace, name string) (*api.Pod, bool)
GetPodByFullName(podFullName string) (*api.Pod, bool)
TranslatePodUID(uid types.UID) types.UID
DeleteOrphanedMirrorPods(mirrorPods *mirrorPods)
SetPods(pods []api.Pod)
mirrorManager
}
type basicPodManager struct {
// Protects all internal pod storage/mappings.
lock sync.RWMutex
pods []api.Pod
// Record the set of mirror pods (see mirror_manager.go for more details);
// similar to pods, this is not immutable and is protected by the same podLock.
// Note that basicPodManager.pods do not contain mirror pods as they are
// filtered out beforehand.
mirrorPods mirrorPods
// A mirror pod manager which provides helper functions.
mirrorManager mirrorManager
}
func newBasicPodManager(apiserverClient client.Interface) *basicPodManager {
podManager := &basicPodManager{}
podManager.mirrorManager = newBasicMirrorManager(apiserverClient)
podManager.mirrorPods = *newMirrorPods()
podManager.pods = []api.Pod{}
return podManager
}
// This method is used only for testing to quickly set the internal pods.
func (self *basicPodManager) SetPods(pods []api.Pod) {
self.pods, self.mirrorPods = filterAndCategorizePods(pods)
}
// Update the internal pods with those provided by the update.
// Records new and updated pods in newPods and updatedPods.
func (self *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
self.lock.Lock()
defer self.lock.Unlock()
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
newPods, newMirrorPods := filterAndCategorizePods(u.Pods)
// Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up.
existingPods := make(map[types.UID]struct{})
for i := range self.pods {
existingPods[self.pods[i].UID] = struct{}{}
}
for _, pod := range newPods {
if _, ok := existingPods[pod.UID]; !ok {
podSyncTypes[pod.UID] = metrics.SyncPodCreate
}
}
// Actually update the pods.
self.pods = newPods
self.mirrorPods = newMirrorPods
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
// Store the updated pods. Don't worry about filtering host ports since those
// pods will never be looked up.
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}
allPods := updatePods(u.Pods, self.pods)
self.pods, self.mirrorPods = filterAndCategorizePods(allPods)
default:
panic("syncLoop does not support incremental changes")
}
// Mark all remaining pods as sync.
for i := range self.pods {
if _, ok := podSyncTypes[self.pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync
}
}
}
func updatePods(changed []api.Pod, current []api.Pod) []api.Pod {
updated := []api.Pod{}
m := map[types.UID]*api.Pod{}
for i := range changed {
pod := &changed[i]
m[pod.UID] = pod
}
for i := range current {
pod := &current[i]
if m[pod.UID] != nil {
updated = append(updated, *m[pod.UID])
glog.V(4).Infof("pod with UID: %q has a new spec %+v", pod.UID, *m[pod.UID])
} else {
updated = append(updated, *pod)
glog.V(4).Infof("pod with UID: %q stay with the same spec %+v", pod.UID, *pod)
}
}
return updated
}
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pod map.
func (self *basicPodManager) GetPods() ([]api.Pod, mirrorPods) {
self.lock.RLock()
defer self.lock.RUnlock()
return append([]api.Pod{}, self.pods...), self.mirrorPods
}
// GetPodByName provides the first pod that matches namespace and name, as well
// as whether the pod was found.
func (self *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
self.lock.RLock()
defer self.lock.RUnlock()
for i := range self.pods {
pod := self.pods[i]
if pod.Namespace == namespace && pod.Name == name {
return &pod, true
}
}
return nil, false
}
func (self *basicPodManager) GetPodByFullName(podFullName string) (*api.Pod, bool) {
name, namespace, err := ParsePodFullName(podFullName)
if err != nil {
return nil, false
}
return self.GetPodByName(namespace, name)
}
// If the UID belongs to a mirror pod, maps it to the UID of its static pod.
// Otherwise, return the original UID. All public-facing functions should
// perform this translation for UIDs because user may provide a mirror pod UID,
// which is not recognized by internal Kubelet functions.
func (self *basicPodManager) TranslatePodUID(uid types.UID) types.UID {
if uid == "" {
return uid
}
self.lock.RLock()
defer self.lock.RUnlock()
staticUID, ok := self.mirrorPods.GetStaticUID(uid)
if ok {
return staticUID
} else {
return uid
}
}
// Delete all orphaned mirror pods. This method doesn't acquire the lock
// because it assumes the a copy of the mirrorPod is passed as an argument.
func (self *basicPodManager) DeleteOrphanedMirrorPods(mirrorPods *mirrorPods) {
podFullNames := mirrorPods.GetOrphanedMirrorPodNames()
for _, podFullName := range podFullNames {
self.mirrorManager.DeleteMirrorPod(podFullName)
}
}
func (self *basicPodManager) CreateMirrorPod(pod api.Pod, hostname string) error {
return self.mirrorManager.CreateMirrorPod(pod, hostname)
}
func (self *basicPodManager) DeleteMirrorPod(podFullName string) error {
return self.mirrorManager.DeleteMirrorPod(podFullName)
}

View File

@ -0,0 +1,25 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
// Stub out mirror manager for testing purpose.
func newFakePodManager() (*basicPodManager, *fakeMirrorManager) {
podManager := newBasicPodManager(nil)
fakeMirrorManager := newFakeMirrorMananger()
podManager.mirrorManager = fakeMirrorManager
return podManager, fakeMirrorManager
}