diff --git a/pkg/kubelet/config/common.go b/pkg/kubelet/config/common.go index 85a0a78938..72ec9c7e50 100644 --- a/pkg/kubelet/config/common.go +++ b/pkg/kubelet/config/common.go @@ -74,6 +74,9 @@ func applyDefaults(pod *api.Pod, source string, isFile bool, hostname string) er } glog.V(5).Infof("Using namespace %q for pod %q from %s", pod.Namespace, pod.Name, source) + // Set the Host field to indicate this pod is scheduled on the current node. + pod.Spec.Host = hostname + // Currently just simply follow the same format in resthandler.go pod.ObjectMeta.SelfLink = fmt.Sprintf("/api/v1beta2/pods/%s?namespace=%s", pod.Name, pod.Namespace) diff --git a/pkg/kubelet/config/file_test.go b/pkg/kubelet/config/file_test.go index a7c3484604..9ffa5608f3 100644 --- a/pkg/kubelet/config/file_test.go +++ b/pkg/kubelet/config/file_test.go @@ -91,6 +91,7 @@ func TestReadFromFile(t *testing.T) { SelfLink: "/api/v1beta2/pods/test-" + hostname + "?namespace=default", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -116,6 +117,7 @@ func TestReadFromFile(t *testing.T) { SelfLink: "/api/v1beta2/pods/12345-" + hostname + "?namespace=default", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -142,6 +144,7 @@ func TestReadFromFile(t *testing.T) { SelfLink: "/api/v1beta2/pods/test-" + hostname + "?namespace=default", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -174,6 +177,7 @@ func TestReadFromFile(t *testing.T) { SelfLink: "/api/v1beta2/pods/test-" + hostname + "?namespace=mynamespace", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -204,6 +208,7 @@ func TestReadFromFile(t *testing.T) { SelfLink: "/api/v1beta2/pods/12345-" + hostname + "?namespace=default", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -235,6 +240,7 @@ func TestReadFromFile(t *testing.T) { SelfLink: "/api/v1beta2/pods/test-" + hostname + "?namespace=default", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -360,6 +366,7 @@ func ExampleManifestAndPod(id string) (v1beta1.ContainerManifest, api.Pod) { SelfLink: "/api/v1beta2/pods/" + id + "-" + hostname + "?namespace=default", }, Spec: api.PodSpec{ + Host: hostname, Containers: []api.Container{ { Name: "c" + id, diff --git a/pkg/kubelet/config/http_test.go b/pkg/kubelet/config/http_test.go index da096972c9..0032f14625 100644 --- a/pkg/kubelet/config/http_test.go +++ b/pkg/kubelet/config/http_test.go @@ -138,6 +138,7 @@ func TestExtractManifestFromHTTP(t *testing.T) { SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=default", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -162,6 +163,7 @@ func TestExtractManifestFromHTTP(t *testing.T) { SelfLink: "/api/v1beta2/pods/111-" + hostname + "?namespace=default", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -186,6 +188,7 @@ func TestExtractManifestFromHTTP(t *testing.T) { SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=default", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -214,6 +217,7 @@ func TestExtractManifestFromHTTP(t *testing.T) { SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=default", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -231,6 +235,7 @@ func TestExtractManifestFromHTTP(t *testing.T) { SelfLink: "/api/v1beta2/pods/bar-" + hostname + "?namespace=default", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -320,6 +325,7 @@ func TestExtractPodsFromHTTP(t *testing.T) { SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=mynamespace", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -343,6 +349,7 @@ func TestExtractPodsFromHTTP(t *testing.T) { Namespace: "mynamespace", }, Spec: v1beta3.PodSpec{ + Host: hostname, Containers: []v1beta3.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta3.PullAlways}}, }, }, @@ -356,6 +363,7 @@ func TestExtractPodsFromHTTP(t *testing.T) { SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=mynamespace", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -380,6 +388,7 @@ func TestExtractPodsFromHTTP(t *testing.T) { UID: "111", }, Spec: v1beta3.PodSpec{ + Host: hostname, Containers: []v1beta3.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta3.PullAlways}}, }, }, @@ -389,6 +398,7 @@ func TestExtractPodsFromHTTP(t *testing.T) { UID: "222", }, Spec: v1beta3.PodSpec{ + Host: hostname, Containers: []v1beta3.Container{{Name: "2", Image: "bar", ImagePullPolicy: ""}}, }, }, @@ -404,6 +414,7 @@ func TestExtractPodsFromHTTP(t *testing.T) { SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=default", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ @@ -421,6 +432,7 @@ func TestExtractPodsFromHTTP(t *testing.T) { SelfLink: "/api/v1beta2/pods/bar-" + hostname + "?namespace=default", }, Spec: api.PodSpec{ + Host: hostname, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, Containers: []api.Container{{ diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 5f5de89755..741270bea2 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1332,18 +1332,26 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], &podVolumes, podInfraContainerID) } - if mirrorPod == nil && isStaticPod(pod) { - glog.V(4).Infof("Creating a mirror pod %q", podFullName) - // To make sure we will properly update static pod status we need to delete - // it from status manager. Otherwise it is possible that we will miss manual - // deletion of mirror pod in apiserver and will never reset its status to - // Running after recreating it. - kl.statusManager.DeletePodStatus(podFullName) - if err := kl.podManager.CreateMirrorPod(*pod, kl.hostname); err != nil { - glog.Errorf("Failed creating a mirror pod %q: %#v", podFullName, err) + if isStaticPod(pod) { + if mirrorPod != nil && !kl.podManager.IsMirrorPodOf(mirrorPod, pod) { + // The mirror pod is semantically different from the static pod. Remove + // it. The mirror pod will get recreated later. + glog.Errorf("Deleting mirror pod %q because it is outdated", podFullName) + if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil { + glog.Errorf("Failed deleting mirror pod %q: %v", podFullName, err) + } + } + if mirrorPod == nil { + glog.V(3).Infof("Creating a mirror pod %q", podFullName) + if err := kl.podManager.CreateMirrorPod(*pod); err != nil { + glog.Errorf("Failed creating a mirror pod %q: %v", podFullName, err) + } + // Pod status update is edge-triggered. If there is any update of the + // mirror pod, we need to delete the existing status associated with + // the static pod to trigger an update. + kl.statusManager.DeletePodStatus(podFullName) } } - return nil } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 6bd5036e6a..407d78798b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -3271,6 +3271,57 @@ func TestCreateMirrorPod(t *testing.T) { } } +func TestDeleteOutdatedMirrorPod(t *testing.T) { + testKubelet := newTestKubelet(t) + testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) + kl := testKubelet.kubelet + manager := testKubelet.fakeMirrorClient + pod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "foo", + Namespace: "ns", + Annotations: map[string]string{ + ConfigSourceAnnotationKey: "file", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "1234", Image: "foo"}, + }, + }, + } + // Mirror pod has an outdated spec. + mirrorPod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "11111111", + Name: "foo", + Namespace: "ns", + Annotations: map[string]string{ + ConfigSourceAnnotationKey: "api", + ConfigMirrorAnnotationKey: "mirror", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "1234", Image: "bar"}, + }, + }, + } + + pods := []api.Pod{pod, mirrorPod} + kl.podManager.SetPods(pods) + err := kl.syncPod(&pod, &mirrorPod, container.Pod{}) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + name := kubecontainer.GetPodFullName(&pod) + creates, deletes := manager.GetCounts(name) + if creates != 0 || deletes != 1 { + t.Errorf("expected 0 creation and 1 deletion of %q, got %d, %d", name, creates, deletes) + } +} + func TestDeleteOrphanedMirrorPods(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) diff --git a/pkg/kubelet/mirror_client.go b/pkg/kubelet/mirror_client.go index 9dd8927ec4..378eede726 100644 --- a/pkg/kubelet/mirror_client.go +++ b/pkg/kubelet/mirror_client.go @@ -28,7 +28,7 @@ import ( // Mirror client is used to create/delete a mirror pod. type mirrorClient interface { - CreateMirrorPod(api.Pod, string) error + CreateMirrorPod(api.Pod) error DeleteMirrorPod(string) error } @@ -43,12 +43,10 @@ func newBasicMirrorClient(apiserverClient client.Interface) *basicMirrorClient { } // Creates a mirror pod. -func (self *basicMirrorClient) CreateMirrorPod(pod api.Pod, hostname string) error { +func (self *basicMirrorClient) CreateMirrorPod(pod api.Pod) error { if self.apiserverClient == nil { return nil } - // Indicate that the pod should be scheduled to the current node. - pod.Spec.Host = hostname pod.Annotations[ConfigMirrorAnnotationKey] = MirrorType _, err := self.apiserverClient.Pods(NamespaceDefault).Create(&pod) diff --git a/pkg/kubelet/mirror_client_test.go b/pkg/kubelet/mirror_client_test.go index b6a6534a7b..e0a549b853 100644 --- a/pkg/kubelet/mirror_client_test.go +++ b/pkg/kubelet/mirror_client_test.go @@ -34,7 +34,7 @@ type fakeMirrorClient struct { deleteCounts map[string]int } -func (self *fakeMirrorClient) CreateMirrorPod(pod api.Pod, _ string) error { +func (self *fakeMirrorClient) CreateMirrorPod(pod api.Pod) error { self.mirrorPodLock.Lock() defer self.mirrorPodLock.Unlock() podFullName := kubecontainer.GetPodFullName(&pod) diff --git a/pkg/kubelet/pod_manager.go b/pkg/kubelet/pod_manager.go index c517ed9a4d..562a039c09 100644 --- a/pkg/kubelet/pod_manager.go +++ b/pkg/kubelet/pod_manager.go @@ -51,6 +51,7 @@ type podManager interface { UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) DeleteOrphanedMirrorPods() TranslatePodUID(uid types.UID) types.UID + IsMirrorPodOf(mirrorPod, pod *api.Pod) bool mirrorClient } @@ -111,7 +112,7 @@ func (self *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID] for i := range u.Pods { podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate } - allPods := applyUpdates(u.Pods, self.getPods()) + allPods := applyUpdates(u.Pods, self.getAllPods()) self.setPods(allPods) default: panic("syncLoop does not support incremental changes") @@ -178,9 +179,9 @@ func applyUpdates(changed []api.Pod, current []api.Pod) []api.Pod { return updated } -func (self *basicPodManager) getPods() []api.Pod { - pods := make([]api.Pod, 0, len(self.podByUID)) - for _, pod := range self.podByUID { +func (self *basicPodManager) convertMapToPods(UIDMap map[types.UID]*api.Pod) []api.Pod { + pods := make([]api.Pod, 0, len(UIDMap)) + for _, pod := range UIDMap { pods = append(pods, *pod) } return pods @@ -190,7 +191,12 @@ func (self *basicPodManager) getPods() []api.Pod { func (self *basicPodManager) GetPods() []api.Pod { self.lock.RLock() defer self.lock.RUnlock() - return self.getPods() + return self.convertMapToPods(self.podByUID) +} + +// Returns all pods (including mirror pods). +func (self *basicPodManager) getAllPods() []api.Pod { + return append(self.convertMapToPods(self.podByUID), self.convertMapToPods(self.mirrorPodByUID)...) } // GetPodsAndMirrorMap returns the a copy of the regular pods and the mirror @@ -202,7 +208,7 @@ func (self *basicPodManager) GetPodsAndMirrorMap() ([]api.Pod, map[string]api.Po for key, pod := range self.mirrorPodByFullName { mirrorPods[key] = *pod } - return self.getPods(), mirrorPods + return self.convertMapToPods(self.podByUID), mirrorPods } // GetPodByName provides the (non-mirror) pod that matches namespace and name, @@ -263,11 +269,20 @@ func (self *basicPodManager) DeleteOrphanedMirrorPods() { } // Creates a mirror pod for the given pod. -func (self *basicPodManager) CreateMirrorPod(pod api.Pod, hostname string) error { - return self.mirrorClient.CreateMirrorPod(pod, hostname) +func (self *basicPodManager) CreateMirrorPod(pod api.Pod) error { + return self.mirrorClient.CreateMirrorPod(pod) } // Delete a mirror pod by name. func (self *basicPodManager) DeleteMirrorPod(podFullName string) error { return self.mirrorClient.DeleteMirrorPod(podFullName) } + +// Returns true if mirrorPod is a correct representation of pod; false otherwise. +func (self *basicPodManager) IsMirrorPodOf(mirrorPod, pod *api.Pod) bool { + // Check name and namespace first. + if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace { + return false + } + return api.Semantic.DeepEqual(&pod.Spec, &mirrorPod.Spec) +}