From 5b4a210d49c58e4e517738912330c294a32377b4 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Tue, 8 Dec 2015 19:13:09 -0800 Subject: [PATCH] Add reconcile support in kubelet --- pkg/kubelet/config/config.go | 54 +++++++++----- pkg/kubelet/config/config_test.go | 52 +++++++++++++- pkg/kubelet/kubelet.go | 12 ++++ pkg/kubelet/status/manager.go | 95 ++++++++++++++++++++++-- pkg/kubelet/status/manager_test.go | 111 ++++++++++++++++++++++++----- pkg/kubelet/types/pod_update.go | 3 + 6 files changed, 282 insertions(+), 45 deletions(-) diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 86931ae331..fe0ffb412b 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -46,7 +46,7 @@ const ( // PodConfigNotificationSnapshotAndUpdates delivers an UPDATE message whenever pods are // changed, and a SET message if there are any additions or removals. PodConfigNotificationSnapshotAndUpdates - // PodConfigNotificationIncremental delivers ADD, UPDATE, and REMOVE to the update channel. + // PodConfigNotificationIncremental delivers ADD, UPDATE, REMOVE, RECONCILE to the update channel. PodConfigNotificationIncremental ) @@ -152,7 +152,7 @@ func (s *podStorage) Merge(source string, change interface{}) error { defer s.updateLock.Unlock() seenBefore := s.sourcesSeen.Has(source) - adds, updates, deletes := s.merge(source, change) + adds, updates, deletes, reconciles := s.merge(source, change) firstSet := !seenBefore && s.sourcesSeen.Has(source) // deliver update notifications @@ -167,6 +167,10 @@ func (s *podStorage) Merge(source string, change interface{}) error { if len(updates.Pods) > 0 { s.updates <- *updates } + // Only add reconcile support here, because kubelet doesn't support Snapshot update now. + if len(reconciles.Pods) > 0 { + s.updates <- *reconciles + } case PodConfigNotificationSnapshotAndUpdates: if len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet { @@ -190,13 +194,14 @@ func (s *podStorage) Merge(source string, change interface{}) error { return nil } -func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes *kubetypes.PodUpdate) { +func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, reconciles *kubetypes.PodUpdate) { s.podLock.Lock() defer s.podLock.Unlock() addPods := []*api.Pod{} updatePods := []*api.Pod{} deletePods := []*api.Pod{} + reconcilePods := []*api.Pod{} pods := s.pods[source] if pods == nil { @@ -221,12 +226,12 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de } ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source if existing, found := pods[name]; found { - if checkAndUpdatePod(existing, ref) { - // this is an update + needUpdate, needReconcile := checkAndUpdatePod(existing, ref) + if needUpdate { updatePods = append(updatePods, existing) - continue + } else if needReconcile { + reconcilePods = append(reconcilePods, existing) } - // this is a no-op continue } // this is an add @@ -265,12 +270,12 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source if existing, found := oldPods[name]; found { pods[name] = existing - if checkAndUpdatePod(existing, ref) { - // this is an update + needUpdate, needReconcile := checkAndUpdatePod(existing, ref) + if needUpdate { updatePods = append(updatePods, existing) - continue + } else if needReconcile { + reconcilePods = append(reconcilePods, existing) } - // this is a no-op continue } recordFirstSeenTime(ref) @@ -295,8 +300,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source} updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source} deletes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(deletePods), Source: source} + reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source} - return adds, updates, deletes + return adds, updates, deletes, reconciles } func (s *podStorage) markSourceSet(source string) { @@ -416,13 +422,25 @@ func podsDifferSemantically(existing, ref *api.Pod) bool { return true } -// checkAndUpdatePod updates existing if ref makes a meaningful change and returns true, or -// returns false if there was no update. -func checkAndUpdatePod(existing, ref *api.Pod) bool { +// checkAndUpdatePod updates existing, and: +// * if ref makes a meaningful change, returns needUpdate=true +// * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true +// * else return both false +// Now, needUpdate and needReconcile should never be both true +func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool) { // TODO: it would be better to update the whole object and only preserve certain things // like the source annotation or the UID (to ensure safety) if !podsDifferSemantically(existing, ref) { - return false + // this is not an update + // Only check reconcile when it is not an update, because if the pod is going to + // be updated, an extra reconcile is unnecessary + if !reflect.DeepEqual(existing.Status, ref.Status) { + // Pod with changed pod status needs reconcile, because kubelet should + // be the source of truth of pod status. + existing.Status = ref.Status + needReconcile = true + } + return } // this is an update @@ -434,8 +452,10 @@ func checkAndUpdatePod(existing, ref *api.Pod) bool { existing.Labels = ref.Labels existing.DeletionTimestamp = ref.DeletionTimestamp existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds + existing.Status = ref.Status updateAnnotations(existing, ref) - return true + needUpdate = true + return } // Sync sends a copy of the current state through the update channel. diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index 362a488aca..5c149fadc9 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -17,7 +17,10 @@ limitations under the License. package config import ( + "math/rand" + "reflect" "sort" + "strconv" "testing" "k8s.io/kubernetes/pkg/api" @@ -105,7 +108,7 @@ func expectPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate, expected ...ku // Compare pods one by one. This is necessary beacuse we don't want to // compare local annotations. for j := range expected[i].Pods { - if podsDifferSemantically(expected[i].Pods[j], update.Pods[j]) { + if podsDifferSemantically(expected[i].Pods[j], update.Pods[j]) || !reflect.DeepEqual(expected[i].Pods[j].Status, update.Pods[j].Status) { t.Fatalf("Expected %#v, Got %#v", expected[i].Pods[j], update.Pods[j]) } } @@ -267,6 +270,51 @@ func TestNewPodAddedUpdatedSet(t *testing.T) { CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) } +func TestNewPodAddedSetReconciled(t *testing.T) { + // Create and touch new test pods, return the new pods and touched pod. We should create new pod list + // before touching to avoid data race. + newTestPods := func(touchStatus, touchSpec bool) ([]*api.Pod, *api.Pod) { + pods := []*api.Pod{ + CreateValidPod("changable-pod-0", "new"), + CreateValidPod("constant-pod-1", "new"), + CreateValidPod("constant-pod-2", "new"), + } + if touchStatus { + pods[0].Status = api.PodStatus{Message: strconv.Itoa(rand.Int())} + } + if touchSpec { + pods[0].Spec.Containers[0].Name = strconv.Itoa(rand.Int()) + } + return pods, pods[0] + } + for _, op := range []kubetypes.PodOperation{ + kubetypes.ADD, + kubetypes.SET, + } { + var podWithStatusChange *api.Pod + pods, _ := newTestPods(false, false) + channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + + // Use SET to initialize the config, especially initialize the source set + channel <- CreatePodUpdate(kubetypes.SET, TestSource, pods...) + expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, pods...)) + + // If status is not changed, no reconcile should be triggered + channel <- CreatePodUpdate(op, TestSource, pods...) + expectNoPodUpdate(t, ch) + + // If the pod status is changed and not updated, a reconcile should be triggered + pods, podWithStatusChange = newTestPods(true, false) + channel <- CreatePodUpdate(op, TestSource, pods...) + expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RECONCILE, TestSource, podWithStatusChange)) + + // If the pod status is changed, but the pod is also updated, no reconcile should be triggered + pods, podWithStatusChange = newTestPods(true, true) + channel <- CreatePodUpdate(op, TestSource, pods...) + expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, podWithStatusChange)) + } +} + func TestInitialEmptySet(t *testing.T) { for _, test := range []struct { mode PodConfigNotificationMode @@ -324,7 +372,7 @@ func TestPodUpdateAnnotations(t *testing.T) { expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) } -func TestPodUpdateLables(t *testing.T) { +func TestPodUpdateLabels(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) pod := CreateValidPod("foo2", "new") diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 7bc7d5c9a9..7f6ef1c89e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -133,6 +133,7 @@ type SyncHandler interface { HandlePodAdditions(pods []*api.Pod) HandlePodUpdates(pods []*api.Pod) HandlePodDeletions(pods []*api.Pod) + HandlePodReconcile(pods []*api.Pod) HandlePodSyncs(pods []*api.Pod) HandlePodCleanups() error } @@ -2334,6 +2335,9 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler case kubetypes.REMOVE: glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodDeletions(u.Pods) + case kubetypes.RECONCILE: + glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods)) + handler.HandlePodReconcile(u.Pods) case kubetypes.SET: // TODO: Do we want to support this? glog.Errorf("Kubelet does not support snapshot update") @@ -2468,6 +2472,14 @@ func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) { } } +func (kl *Kubelet) HandlePodReconcile(pods []*api.Pod) { + for _, pod := range pods { + // Update the pod in pod manager, status manager will do periodically reconcile according + // to the pod manager. + kl.podManager.UpdatePod(pod) + } +} + func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) { start := time.Now() for _, pod := range pods { diff --git a/pkg/kubelet/status/manager.go b/pkg/kubelet/status/manager.go index af9d3b0519..34fd8b43c7 100644 --- a/pkg/kubelet/status/manager.go +++ b/pkg/kubelet/status/manager.go @@ -17,7 +17,6 @@ limitations under the License. package status import ( - "reflect" "sort" "sync" "time" @@ -105,13 +104,12 @@ func NewManager(kubeClient client.Interface, podManager kubepod.Manager) Manager } // isStatusEqual returns true if the given pod statuses are equal, false otherwise. -// This method sorts container statuses so order does not affect equality. +// This method normalizes the status before comparing so as to make sure that meaningless +// changes will be ignored. func isStatusEqual(oldStatus, status *api.PodStatus) bool { - sort.Sort(kubetypes.SortedContainerStatuses(status.ContainerStatuses)) - sort.Sort(kubetypes.SortedContainerStatuses(oldStatus.ContainerStatuses)) - - // TODO: More sophisticated equality checking. - return reflect.DeepEqual(status, oldStatus) + normalizeStatus(oldStatus) + normalizeStatus(status) + return api.Semantic.DeepEqual(status, oldStatus) } func (m *manager) Start() { @@ -329,6 +327,13 @@ func (m *manager) syncBatch() { } if m.needsUpdate(syncedUID, status) { updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) + } else if m.needsReconcile(uid, status.status) { + // Delete the apiStatusVersions here to force an update on the pod status + // In most cases the deleted apiStatusVersions here should be filled + // soon after the following syncPod() [If the syncPod() sync an update + // successfully]. + delete(m.apiStatusVersions, syncedUID) + updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) } } }() @@ -392,6 +397,82 @@ func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool { return !ok || latest < status.version } +// needsReconcile compares the given status with the status in the pod manager (which +// in fact comes from apiserver), returns whether the status needs to be reconciled with +// the apiserver. Now when pod status is inconsistent between apiserver and kubelet, +// kubelet should forcibly send an update to reconclie the inconsistence, because kubelet +// should be the source of truth of pod status. +// NOTE(random-liu): It's simpler to pass in mirror pod uid and get mirror pod by uid, but +// now the pod manager only supports getting mirror pod by static pod, so we have to pass +// static pod uid here. +// TODO(random-liu): Simplify the logic when mirror pod manager is added. +func (m *manager) needsReconcile(uid types.UID, status api.PodStatus) bool { + // The pod could be a static pod, so we should translate first. + pod, ok := m.podManager.GetPodByUID(uid) + if !ok { + // Although we get uid from pod manager in syncBatch, it still could be deleted before here. + glog.V(4).Infof("Pod %q has been deleted, no need to reconcile", format.Pod(pod)) + return false + } + // If the pod is a static pod, we should check its mirror pod, because only status in mirror pod is meaningful to us. + if kubepod.IsStaticPod(pod) { + mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod) + if !ok { + glog.V(4).Infof("Static pod %q has no corresponding mirror pod, no need to reconcile", format.Pod(pod)) + return false + } + pod = mirrorPod + } + + if isStatusEqual(&pod.Status, &status) { + // If the status from the source is the same with the cached status, + // reconcile is not needed. Just return. + return false + } + glog.V(3).Infof("Pod status is inconsistent with cached status, a reconciliation should be triggered:\n %+v", util.ObjectDiff(pod.Status, status)) + + return true +} + +// We add this function, because apiserver only supports *RFC3339* now, which means that the timestamp returned by +// apiserver has no nanosecond infromation. However, the timestamp returned by unversioned.Now() contains nanosecond, +// so when we do comparison between status from apiserver and cached status, isStatusEqual() will always return false. +// There is related issue #15262 and PR #15263 about this. +// In fact, the best way to solve this is to do it on api side. However for now, we normalize the status locally in +// kubelet temporarily. +// TODO(random-liu): Remove timestamp related logic after apiserver supports nanosecond or makes it consistent. +func normalizeStatus(status *api.PodStatus) *api.PodStatus { + normalizeTimeStamp := func(t *unversioned.Time) { + *t = t.Rfc3339Copy() + } + normalizeContainerState := func(c *api.ContainerState) { + if c.Running != nil { + normalizeTimeStamp(&c.Running.StartedAt) + } + if c.Terminated != nil { + normalizeTimeStamp(&c.Terminated.StartedAt) + normalizeTimeStamp(&c.Terminated.FinishedAt) + } + } + + if status.StartTime != nil { + normalizeTimeStamp(status.StartTime) + } + for i := range status.Conditions { + condition := &status.Conditions[i] + normalizeTimeStamp(&condition.LastProbeTime) + normalizeTimeStamp(&condition.LastTransitionTime) + } + for i := range status.ContainerStatuses { + cstatus := &status.ContainerStatuses[i] + normalizeContainerState(&cstatus.State) + normalizeContainerState(&cstatus.LastTerminationState) + } + // Sort the container statuses, so that the order won't affect the result of comparison + sort.Sort(kubetypes.SortedContainerStatuses(status.ContainerStatuses)) + return status +} + // notRunning returns true if every status is terminated or waiting, or the status list // is empty. func notRunning(statuses []api.ContainerStatus) bool { diff --git a/pkg/kubelet/status/manager_test.go b/pkg/kubelet/status/manager_test.go index 3a4fa4dfb3..7fbfea4da1 100644 --- a/pkg/kubelet/status/manager_test.go +++ b/pkg/kubelet/status/manager_test.go @@ -44,6 +44,24 @@ var testPod *api.Pod = &api.Pod{ }, } +// After adding reconciliation, if status in pod manager is different from the cached status, a reconciliation +// will be triggered, which will mess up all the old unit test. +// To simplify the implementation of unit test, we add testSyncBatch() here, it will make sure the statuses in +// pod manager the same with cached ones before syncBatch() so as to avoid reconciling. +func (m *manager) testSyncBatch() { + for uid, status := range m.podStatuses { + pod, ok := m.podManager.GetPodByUID(uid) + if ok { + pod.Status = status.status + } + pod, ok = m.podManager.GetMirrorPodByPod(pod) + if ok { + pod.Status = status.status + } + } + m.syncBatch() +} + func newTestManager(kubeClient client.Interface) *manager { podManager := kubepod.NewBasicPodManager(kubepod.NewFakeMirrorClient()) podManager.AddPod(testPod) @@ -209,8 +227,8 @@ func TestChangedStatusUpdatesLastTransitionTime(t *testing.T) { if newReadyCondition.LastTransitionTime.IsZero() { t.Errorf("Unexpected: last transition time not set") } - if !oldReadyCondition.LastTransitionTime.Before(newReadyCondition.LastTransitionTime) { - t.Errorf("Unexpected: new transition time %s, is not after old transition time %s", newReadyCondition.LastTransitionTime, oldReadyCondition.LastTransitionTime) + if newReadyCondition.LastTransitionTime.Before(oldReadyCondition.LastTransitionTime) { + t.Errorf("Unexpected: new transition time %s, is before old transition time %s", newReadyCondition.LastTransitionTime, oldReadyCondition.LastTransitionTime) } } @@ -259,7 +277,7 @@ func TestSyncBatchIgnoresNotFound(t *testing.T) { return true, nil, errors.NewNotFound(api.Resource("pods"), "test-pod") }) syncer.SetPodStatus(testPod, getRandomPodStatus()) - syncer.syncBatch() + syncer.testSyncBatch() verifyActions(t, syncer.kubeClient, []testclient.Action{ testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, @@ -270,7 +288,7 @@ func TestSyncBatch(t *testing.T) { syncer := newTestManager(&testclient.Fake{}) syncer.kubeClient = testclient.NewSimpleFake(testPod) syncer.SetPodStatus(testPod, getRandomPodStatus()) - syncer.syncBatch() + syncer.testSyncBatch() verifyActions(t, syncer.kubeClient, []testclient.Action{ testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}}, @@ -288,7 +306,7 @@ func TestSyncBatchChecksMismatchedUID(t *testing.T) { syncer.podManager.AddPod(&differentPod) syncer.kubeClient = testclient.NewSimpleFake(&pod) syncer.SetPodStatus(&differentPod, getRandomPodStatus()) - syncer.syncBatch() + syncer.testSyncBatch() verifyActions(t, syncer.kubeClient, []testclient.Action{ testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, }) @@ -324,7 +342,7 @@ func TestSyncBatchNoDeadlock(t *testing.T) { ret = *pod err = errors.NewNotFound(api.Resource("pods"), pod.Name) m.SetPodStatus(pod, getRandomPodStatus()) - m.syncBatch() + m.testSyncBatch() verifyActions(t, client, []testclient.Action{getAction}) client.ClearActions() @@ -332,21 +350,21 @@ func TestSyncBatchNoDeadlock(t *testing.T) { ret.UID = "other_pod" err = nil m.SetPodStatus(pod, getRandomPodStatus()) - m.syncBatch() + m.testSyncBatch() verifyActions(t, client, []testclient.Action{getAction}) client.ClearActions() // Pod not deleted (success case). ret = *pod m.SetPodStatus(pod, getRandomPodStatus()) - m.syncBatch() + m.testSyncBatch() verifyActions(t, client, []testclient.Action{getAction, updateAction}) client.ClearActions() // Pod is terminated, but still running. pod.DeletionTimestamp = new(unversioned.Time) m.SetPodStatus(pod, getRandomPodStatus()) - m.syncBatch() + m.testSyncBatch() verifyActions(t, client, []testclient.Action{getAction, updateAction}) client.ClearActions() @@ -354,14 +372,14 @@ func TestSyncBatchNoDeadlock(t *testing.T) { pod.Status.ContainerStatuses[0].State.Running = nil pod.Status.ContainerStatuses[0].State.Terminated = &api.ContainerStateTerminated{} m.SetPodStatus(pod, getRandomPodStatus()) - m.syncBatch() + m.testSyncBatch() verifyActions(t, client, []testclient.Action{getAction, updateAction}) client.ClearActions() // Error case. err = fmt.Errorf("intentional test error") m.SetPodStatus(pod, getRandomPodStatus()) - m.syncBatch() + m.testSyncBatch() verifyActions(t, client, []testclient.Action{getAction}) client.ClearActions() } @@ -380,7 +398,7 @@ func TestStaleUpdates(t *testing.T) { verifyUpdates(t, m, 3) t.Logf("First sync pushes latest status.") - m.syncBatch() + m.testSyncBatch() verifyActions(t, m.kubeClient, []testclient.Action{ testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}}, @@ -389,7 +407,7 @@ func TestStaleUpdates(t *testing.T) { for i := 0; i < 2; i++ { t.Logf("Next 2 syncs should be ignored (%d).", i) - m.syncBatch() + m.testSyncBatch() verifyActions(t, m.kubeClient, []testclient.Action{}) } @@ -401,7 +419,7 @@ func TestStaleUpdates(t *testing.T) { m.apiStatusVersions[pod.UID] = m.apiStatusVersions[pod.UID] - 1 m.SetPodStatus(&pod, status) - m.syncBatch() + m.testSyncBatch() verifyActions(t, m.kubeClient, []testclient.Action{ testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}}, @@ -471,7 +489,7 @@ func TestStaticPodStatus(t *testing.T) { retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID) assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) // Should translate mirrorPod / staticPod UID. - m.syncBatch() + m.testSyncBatch() verifyActions(t, m.kubeClient, []testclient.Action{ testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}}, @@ -483,7 +501,7 @@ func TestStaticPodStatus(t *testing.T) { client.ClearActions() // No changes. - m.syncBatch() + m.testSyncBatch() verifyActions(t, m.kubeClient, []testclient.Action{}) // Mirror pod identity changes. @@ -492,7 +510,7 @@ func TestStaticPodStatus(t *testing.T) { mirrorPod.Status = api.PodStatus{} m.podManager.AddPod(&mirrorPod) // Expect update to new mirrorPod. - m.syncBatch() + m.testSyncBatch() verifyActions(t, m.kubeClient, []testclient.Action{ testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}}, @@ -604,7 +622,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) { // Orphaned pods should be removed. m.apiStatusVersions[testPod.UID] = 100 m.apiStatusVersions[mirrorPod.UID] = 200 - m.syncBatch() + m.testSyncBatch() if _, ok := m.apiStatusVersions[testPod.UID]; ok { t.Errorf("Should have cleared status for testPod") } @@ -621,7 +639,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) { m.podManager.AddPod(&staticPod) m.apiStatusVersions[testPod.UID] = 100 m.apiStatusVersions[mirrorPod.UID] = 200 - m.syncBatch() + m.testSyncBatch() if _, ok := m.apiStatusVersions[testPod.UID]; !ok { t.Errorf("Should not have cleared status for testPod") } @@ -630,6 +648,61 @@ func TestSyncBatchCleanupVersions(t *testing.T) { } } +func TestReconcilePodStatus(t *testing.T) { + client := testclient.NewSimpleFake(testPod) + syncer := newTestManager(client) + syncer.SetPodStatus(testPod, getRandomPodStatus()) + // Call syncBatch directly to test reconcile + syncer.syncBatch() // The apiStatusVersions should be set now + + originalStatus := testPod.Status + podStatus, ok := syncer.GetPodStatus(testPod.UID) + if !ok { + t.Fatal("Should find pod status for pod: %+v", testPod) + } + testPod.Status = podStatus + + // If the pod status is the same, a reconciliation is not needed, + // syncBatch should do nothing + syncer.podManager.UpdatePod(testPod) + if syncer.needsReconcile(testPod.UID, podStatus) { + t.Errorf("Pod status is the same, a reconciliation is not needed") + } + client.ClearActions() + syncer.syncBatch() + verifyActions(t, client, []testclient.Action{}) + + // If the pod status is the same, only the timestamp is in Rfc3339 format (lower precision without nanosecond), + // a reconciliation is not needed, syncBatch should do nothing. + // The StartTime should have been set in SetPodStatus(). + // TODO(random-liu): Remove this later when api becomes consistent for timestamp. + normalizedStartTime := testPod.Status.StartTime.Rfc3339Copy() + testPod.Status.StartTime = &normalizedStartTime + syncer.podManager.UpdatePod(testPod) + if syncer.needsReconcile(testPod.UID, podStatus) { + t.Errorf("Pod status only differs for timestamp format, a reconciliation is not needed") + } + client.ClearActions() + syncer.syncBatch() + verifyActions(t, client, []testclient.Action{}) + + // If the pod status is different, a reconciliation is needed, syncBatch should trigger an update + testPod.Status = getRandomPodStatus() + syncer.podManager.UpdatePod(testPod) + if !syncer.needsReconcile(testPod.UID, podStatus) { + t.Errorf("Pod status is different, a reconciliation is needed") + } + client.ClearActions() + syncer.syncBatch() + verifyActions(t, client, []testclient.Action{ + testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, + testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}}, + }) + + // Just in case that testPod is shared among different test functions, set it back. + testPod.Status = originalStatus +} + func expectPodStatus(t *testing.T, m *manager, pod *api.Pod) api.PodStatus { status, ok := m.GetPodStatus(pod.UID) if !ok { diff --git a/pkg/kubelet/types/pod_update.go b/pkg/kubelet/types/pod_update.go index ca0d9e46fa..f93576445f 100644 --- a/pkg/kubelet/types/pod_update.go +++ b/pkg/kubelet/types/pod_update.go @@ -39,6 +39,9 @@ const ( REMOVE // Pods with the given ids have been updated in this source UPDATE + // Pods with the given ids have unexpected status in this source, + // kubelet should reconcile status with this source + RECONCILE // These constants identify the sources of pods // Updates from a file