Kubelet: recreate mirror pod if the static pod changes

If a static pod changes, delete the corresponding mirror pod. When kubelet
could not see mirror pod from the API server update, it'd attemp to create a
new mirror pod with up-to-date specs.
pull/6/head
Yu-Ju Hong 2015-04-08 13:28:33 -07:00
parent 43ec88fda5
commit d7cf294c99
8 changed files with 117 additions and 23 deletions

View File

@ -74,6 +74,9 @@ func applyDefaults(pod *api.Pod, source string, isFile bool, hostname string) er
}
glog.V(5).Infof("Using namespace %q for pod %q from %s", pod.Namespace, pod.Name, source)
// Set the Host field to indicate this pod is scheduled on the current node.
pod.Spec.Host = hostname
// Currently just simply follow the same format in resthandler.go
pod.ObjectMeta.SelfLink =
fmt.Sprintf("/api/v1beta2/pods/%s?namespace=%s", pod.Name, pod.Namespace)

View File

@ -91,6 +91,7 @@ func TestReadFromFile(t *testing.T) {
SelfLink: "/api/v1beta2/pods/test-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -116,6 +117,7 @@ func TestReadFromFile(t *testing.T) {
SelfLink: "/api/v1beta2/pods/12345-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -142,6 +144,7 @@ func TestReadFromFile(t *testing.T) {
SelfLink: "/api/v1beta2/pods/test-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -174,6 +177,7 @@ func TestReadFromFile(t *testing.T) {
SelfLink: "/api/v1beta2/pods/test-" + hostname + "?namespace=mynamespace",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -204,6 +208,7 @@ func TestReadFromFile(t *testing.T) {
SelfLink: "/api/v1beta2/pods/12345-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -235,6 +240,7 @@ func TestReadFromFile(t *testing.T) {
SelfLink: "/api/v1beta2/pods/test-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -360,6 +366,7 @@ func ExampleManifestAndPod(id string) (v1beta1.ContainerManifest, api.Pod) {
SelfLink: "/api/v1beta2/pods/" + id + "-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
Host: hostname,
Containers: []api.Container{
{
Name: "c" + id,

View File

@ -138,6 +138,7 @@ func TestExtractManifestFromHTTP(t *testing.T) {
SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -162,6 +163,7 @@ func TestExtractManifestFromHTTP(t *testing.T) {
SelfLink: "/api/v1beta2/pods/111-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -186,6 +188,7 @@ func TestExtractManifestFromHTTP(t *testing.T) {
SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -214,6 +217,7 @@ func TestExtractManifestFromHTTP(t *testing.T) {
SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -231,6 +235,7 @@ func TestExtractManifestFromHTTP(t *testing.T) {
SelfLink: "/api/v1beta2/pods/bar-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -320,6 +325,7 @@ func TestExtractPodsFromHTTP(t *testing.T) {
SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=mynamespace",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -343,6 +349,7 @@ func TestExtractPodsFromHTTP(t *testing.T) {
Namespace: "mynamespace",
},
Spec: v1beta3.PodSpec{
Host: hostname,
Containers: []v1beta3.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta3.PullAlways}},
},
},
@ -356,6 +363,7 @@ func TestExtractPodsFromHTTP(t *testing.T) {
SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=mynamespace",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -380,6 +388,7 @@ func TestExtractPodsFromHTTP(t *testing.T) {
UID: "111",
},
Spec: v1beta3.PodSpec{
Host: hostname,
Containers: []v1beta3.Container{{Name: "1", Image: "foo", ImagePullPolicy: v1beta3.PullAlways}},
},
},
@ -389,6 +398,7 @@ func TestExtractPodsFromHTTP(t *testing.T) {
UID: "222",
},
Spec: v1beta3.PodSpec{
Host: hostname,
Containers: []v1beta3.Container{{Name: "2", Image: "bar", ImagePullPolicy: ""}},
},
},
@ -404,6 +414,7 @@ func TestExtractPodsFromHTTP(t *testing.T) {
SelfLink: "/api/v1beta2/pods/foo-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{
@ -421,6 +432,7 @@ func TestExtractPodsFromHTTP(t *testing.T) {
SelfLink: "/api/v1beta2/pods/bar-" + hostname + "?namespace=default",
},
Spec: api.PodSpec{
Host: hostname,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{{

View File

@ -1332,18 +1332,26 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], &podVolumes, podInfraContainerID)
}
if mirrorPod == nil && isStaticPod(pod) {
glog.V(4).Infof("Creating a mirror pod %q", podFullName)
// To make sure we will properly update static pod status we need to delete
// it from status manager. Otherwise it is possible that we will miss manual
// deletion of mirror pod in apiserver and will never reset its status to
// Running after recreating it.
kl.statusManager.DeletePodStatus(podFullName)
if err := kl.podManager.CreateMirrorPod(*pod, kl.hostname); err != nil {
glog.Errorf("Failed creating a mirror pod %q: %#v", podFullName, err)
if isStaticPod(pod) {
if mirrorPod != nil && !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
glog.Errorf("Deleting mirror pod %q because it is outdated", podFullName)
if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil {
glog.Errorf("Failed deleting mirror pod %q: %v", podFullName, err)
}
}
if mirrorPod == nil {
glog.V(3).Infof("Creating a mirror pod %q", podFullName)
if err := kl.podManager.CreateMirrorPod(*pod); err != nil {
glog.Errorf("Failed creating a mirror pod %q: %v", podFullName, err)
}
// Pod status update is edge-triggered. If there is any update of the
// mirror pod, we need to delete the existing status associated with
// the static pod to trigger an update.
kl.statusManager.DeletePodStatus(podFullName)
}
}
return nil
}

View File

@ -3271,6 +3271,57 @@ func TestCreateMirrorPod(t *testing.T) {
}
}
func TestDeleteOutdatedMirrorPod(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorClient
pod := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "foo",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "file",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "1234", Image: "foo"},
},
},
}
// Mirror pod has an outdated spec.
mirrorPod := api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "11111111",
Name: "foo",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "1234", Image: "bar"},
},
},
}
pods := []api.Pod{pod, mirrorPod}
kl.podManager.SetPods(pods)
err := kl.syncPod(&pod, &mirrorPod, container.Pod{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
name := kubecontainer.GetPodFullName(&pod)
creates, deletes := manager.GetCounts(name)
if creates != 0 || deletes != 1 {
t.Errorf("expected 0 creation and 1 deletion of %q, got %d, %d", name, creates, deletes)
}
}
func TestDeleteOrphanedMirrorPods(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)

View File

@ -28,7 +28,7 @@ import (
// Mirror client is used to create/delete a mirror pod.
type mirrorClient interface {
CreateMirrorPod(api.Pod, string) error
CreateMirrorPod(api.Pod) error
DeleteMirrorPod(string) error
}
@ -43,12 +43,10 @@ func newBasicMirrorClient(apiserverClient client.Interface) *basicMirrorClient {
}
// Creates a mirror pod.
func (self *basicMirrorClient) CreateMirrorPod(pod api.Pod, hostname string) error {
func (self *basicMirrorClient) CreateMirrorPod(pod api.Pod) error {
if self.apiserverClient == nil {
return nil
}
// Indicate that the pod should be scheduled to the current node.
pod.Spec.Host = hostname
pod.Annotations[ConfigMirrorAnnotationKey] = MirrorType
_, err := self.apiserverClient.Pods(NamespaceDefault).Create(&pod)

View File

@ -34,7 +34,7 @@ type fakeMirrorClient struct {
deleteCounts map[string]int
}
func (self *fakeMirrorClient) CreateMirrorPod(pod api.Pod, _ string) error {
func (self *fakeMirrorClient) CreateMirrorPod(pod api.Pod) error {
self.mirrorPodLock.Lock()
defer self.mirrorPodLock.Unlock()
podFullName := kubecontainer.GetPodFullName(&pod)

View File

@ -51,6 +51,7 @@ type podManager interface {
UpdatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType)
DeleteOrphanedMirrorPods()
TranslatePodUID(uid types.UID) types.UID
IsMirrorPodOf(mirrorPod, pod *api.Pod) bool
mirrorClient
}
@ -111,7 +112,7 @@ func (self *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}
allPods := applyUpdates(u.Pods, self.getPods())
allPods := applyUpdates(u.Pods, self.getAllPods())
self.setPods(allPods)
default:
panic("syncLoop does not support incremental changes")
@ -178,9 +179,9 @@ func applyUpdates(changed []api.Pod, current []api.Pod) []api.Pod {
return updated
}
func (self *basicPodManager) getPods() []api.Pod {
pods := make([]api.Pod, 0, len(self.podByUID))
for _, pod := range self.podByUID {
func (self *basicPodManager) convertMapToPods(UIDMap map[types.UID]*api.Pod) []api.Pod {
pods := make([]api.Pod, 0, len(UIDMap))
for _, pod := range UIDMap {
pods = append(pods, *pod)
}
return pods
@ -190,7 +191,12 @@ func (self *basicPodManager) getPods() []api.Pod {
func (self *basicPodManager) GetPods() []api.Pod {
self.lock.RLock()
defer self.lock.RUnlock()
return self.getPods()
return self.convertMapToPods(self.podByUID)
}
// Returns all pods (including mirror pods).
func (self *basicPodManager) getAllPods() []api.Pod {
return append(self.convertMapToPods(self.podByUID), self.convertMapToPods(self.mirrorPodByUID)...)
}
// GetPodsAndMirrorMap returns the a copy of the regular pods and the mirror
@ -202,7 +208,7 @@ func (self *basicPodManager) GetPodsAndMirrorMap() ([]api.Pod, map[string]api.Po
for key, pod := range self.mirrorPodByFullName {
mirrorPods[key] = *pod
}
return self.getPods(), mirrorPods
return self.convertMapToPods(self.podByUID), mirrorPods
}
// GetPodByName provides the (non-mirror) pod that matches namespace and name,
@ -263,11 +269,20 @@ func (self *basicPodManager) DeleteOrphanedMirrorPods() {
}
// Creates a mirror pod for the given pod.
func (self *basicPodManager) CreateMirrorPod(pod api.Pod, hostname string) error {
return self.mirrorClient.CreateMirrorPod(pod, hostname)
func (self *basicPodManager) CreateMirrorPod(pod api.Pod) error {
return self.mirrorClient.CreateMirrorPod(pod)
}
// Delete a mirror pod by name.
func (self *basicPodManager) DeleteMirrorPod(podFullName string) error {
return self.mirrorClient.DeleteMirrorPod(podFullName)
}
// Returns true if mirrorPod is a correct representation of pod; false otherwise.
func (self *basicPodManager) IsMirrorPodOf(mirrorPod, pod *api.Pod) bool {
// Check name and namespace first.
if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace {
return false
}
return api.Semantic.DeepEqual(&pod.Spec, &mirrorPod.Spec)
}