diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index 7006dc4c42..81ff8b6c21 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -24,8 +24,10 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/client/unversioned/testclient" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober/results" + "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/exec" @@ -117,7 +119,7 @@ func TestDoProbe(t *testing.T) { } // Clean up. - m.statusManager.DeletePodStatus(podUID) + m.statusManager = status.NewManager(&testclient.Fake{}) resultsManager(m, probeType).Remove(containerID) } } diff --git a/pkg/kubelet/status/manager.go b/pkg/kubelet/status/manager.go index 8a55307118..8fdd03c744 100644 --- a/pkg/kubelet/status/manager.go +++ b/pkg/kubelet/status/manager.go @@ -17,10 +17,10 @@ limitations under the License. package status import ( - "fmt" "reflect" "sort" "sync" + "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" @@ -33,19 +33,33 @@ import ( "k8s.io/kubernetes/pkg/util" ) -type podStatusSyncRequest struct { - pod *api.Pod +// A wrapper around api.PodStatus that includes a version to enforce that stale pod statuses are +// not sent to the API server. +type versionedPodStatus struct { status api.PodStatus + // Monotonically increasing version number (per pod). + version uint64 + // Pod name & namespace, for sending updates to API server. + podName string + podNamespace string +} + +type podStatusSyncRequest struct { + podUID types.UID + status versionedPodStatus } // Updates pod statuses in apiserver. Writes only when new status has changed. // All methods are thread-safe. type manager struct { kubeClient client.Interface - // Map from pod full name to sync status of the corresponding pod. - podStatuses map[types.UID]api.PodStatus + // Map from pod UID to sync status of the corresponding pod. + podStatuses map[types.UID]versionedPodStatus podStatusesLock sync.RWMutex podStatusChannel chan podStatusSyncRequest + // Map from pod UID to latest status version successfully sent to the API server. + // apiStatusVersions must only be accessed from the sync thread. + apiStatusVersions map[types.UID]uint64 } // status.Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with @@ -66,19 +80,19 @@ type Manager interface { // return false TerminatePods(pods []*api.Pod) bool - // DeletePodStatus simply removes the given pod from the status cache. - DeletePodStatus(uid types.UID) - // RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in // the provided podUIDs. RemoveOrphanedStatuses(podUIDs map[types.UID]bool) } +const syncPeriod = 10 * time.Second + func NewManager(kubeClient client.Interface) Manager { return &manager{ - kubeClient: kubeClient, - podStatuses: make(map[types.UID]api.PodStatus), - podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses + kubeClient: kubeClient, + podStatuses: make(map[types.UID]versionedPodStatus), + podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses + apiStatusVersions: make(map[types.UID]uint64), } } @@ -100,21 +114,25 @@ func (m *manager) Start() { glog.Infof("Kubernetes client is nil, not starting status manager.") return } - // syncBatch blocks when no updates are available, we can run it in a tight loop. + glog.Info("Starting to sync pod status with apiserver") - go util.Until(func() { - err := m.syncBatch() - if err != nil { - glog.Warningf("Failed to updated pod status: %v", err) + syncTicker := time.Tick(syncPeriod) + // syncPod and syncBatch share the same go routine to avoid sync races. + go util.Forever(func() { + select { + case syncRequest := <-m.podStatusChannel: + m.syncPod(syncRequest.podUID, syncRequest.status) + case <-syncTicker: + m.syncBatch() } - }, 0, util.NeverStop) + }, 0) } func (m *manager) GetPodStatus(uid types.UID) (api.PodStatus, bool) { m.podStatusesLock.RLock() defer m.podStatusesLock.RUnlock() status, ok := m.podStatuses[uid] - return status, ok + return status.status, ok } func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { @@ -123,8 +141,8 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { oldStatus, found := m.podStatuses[pod.UID] // ensure that the start time does not change across updates. - if found && oldStatus.StartTime != nil { - status.StartTime = oldStatus.StartTime + if found && oldStatus.status.StartTime != nil { + status.StartTime = oldStatus.status.StartTime } // Set ReadyCondition.LastTransitionTime. @@ -134,7 +152,7 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { // Need to set LastTransitionTime. lastTransitionTime := unversioned.Now() if found { - oldReadyCondition := api.GetPodReadyCondition(oldStatus) + oldReadyCondition := api.GetPodReadyCondition(oldStatus.status) if oldReadyCondition != nil && readyCondition.Status == oldReadyCondition.Status { lastTransitionTime = oldReadyCondition.LastTransitionTime } @@ -158,17 +176,14 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { } } - // TODO: Holding a lock during blocking operations is dangerous. Refactor so this isn't necessary. - // The intent here is to prevent concurrent updates to a pod's status from - // clobbering each other so the phase of a pod progresses monotonically. - // Currently this routine is not called for the same pod from multiple - // workers and/or the kubelet but dropping the lock before sending the - // status down the channel feels like an easy way to get a bullet in foot. - if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil { - m.podStatuses[pod.UID] = status - m.podStatusChannel <- podStatusSyncRequest{pod, status} - } else { - glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletutil.FormatPodName(pod), status) + newStatus := m.updateStatusInternal(pod, status) + if newStatus != nil { + select { + case m.podStatusChannel <- podStatusSyncRequest{pod.UID, *newStatus}: + default: + // Let the periodic syncBatch handle the update if the channel is full. + // We can't block, since we hold the mutex lock. + } } } @@ -182,17 +197,44 @@ func (m *manager) TerminatePods(pods []*api.Pod) bool { Terminated: &api.ContainerStateTerminated{}, } } - select { - case m.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}: - default: + newStatus := m.updateStatusInternal(pod, pod.Status) + if newStatus != nil { + select { + case m.podStatusChannel <- podStatusSyncRequest{pod.UID, *newStatus}: + default: + sent = false + glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletutil.FormatPodName(pod)) + } + } else { sent = false - glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletutil.FormatPodName(pod)) } } return sent } -func (m *manager) DeletePodStatus(uid types.UID) { +// updateStatusInternal updates the internal status cache, and returns a versioned status if an +// update is necessary. This method IS NOT THREAD SAFE and must be called from a locked function. +func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) *versionedPodStatus { + // The intent here is to prevent concurrent updates to a pod's status from + // clobbering each other so the phase of a pod progresses monotonically. + oldStatus, found := m.podStatuses[pod.UID] + if !found || !isStatusEqual(&oldStatus.status, &status) || pod.DeletionTimestamp != nil { + newStatus := versionedPodStatus{ + status: status, + version: oldStatus.version + 1, + podName: pod.Name, + podNamespace: pod.Namespace, + } + m.podStatuses[pod.UID] = newStatus + return &newStatus + } else { + glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletutil.FormatPodName(pod), status) + return nil // No new status. + } +} + +// deletePodStatus simply removes the given pod from the status cache. +func (m *manager) deletePodStatus(uid types.UID) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() delete(m.podStatuses, uid) @@ -211,56 +253,83 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { } // syncBatch syncs pods statuses with the apiserver. -func (m *manager) syncBatch() error { - syncRequest := <-m.podStatusChannel - pod := syncRequest.pod - status := syncRequest.status +func (m *manager) syncBatch() { + var updatedStatuses []podStatusSyncRequest + func() { // Critical section + m.podStatusesLock.RLock() + defer m.podStatusesLock.RUnlock() - var err error - statusPod := &api.Pod{ - ObjectMeta: pod.ObjectMeta, + // Clean up orphaned versions. + for uid := range m.apiStatusVersions { + if _, ok := m.podStatuses[uid]; !ok { + delete(m.apiStatusVersions, uid) + } + } + + for uid, status := range m.podStatuses { + if m.needsUpdate(uid, status) { + updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) + } + } + }() + + for _, update := range updatedStatuses { + m.syncPod(update.podUID, update.status) } +} + +// syncPod syncs the given status with the API server. The caller must not hold the lock. +func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { + if !m.needsUpdate(uid, status) { + glog.Warningf("Status is up-to-date; skipping: %q %+v", uid, status) + return + } + // TODO: make me easier to express from client code - statusPod, err = m.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name) + pod, err := m.kubeClient.Pods(status.podNamespace).Get(status.podName) if errors.IsNotFound(err) { - glog.V(3).Infof("Pod %q was deleted on the server", pod.Name) - return nil + glog.V(3).Infof("Pod %q was deleted on the server", status.podName) + m.deletePodStatus(uid) + return } if err == nil { - if len(pod.UID) > 0 && statusPod.UID != pod.UID { - glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update", kubeletutil.FormatPodName(pod)) - return nil + if len(pod.UID) > 0 && pod.UID != uid { + glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update", + kubeletutil.FormatPodName(pod)) + m.deletePodStatus(uid) + return } - statusPod.Status = status + pod.Status = status.status // TODO: handle conflict as a retry, make that easier too. - statusPod, err = m.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod) + pod, err = m.kubeClient.Pods(pod.Namespace).UpdateStatus(pod) if err == nil { glog.V(3).Infof("Status for pod %q updated successfully", kubeletutil.FormatPodName(pod)) + m.apiStatusVersions[uid] = status.version if pod.DeletionTimestamp == nil { - return nil + return } if !notRunning(pod.Status.ContainerStatuses) { - glog.V(3).Infof("Pod %q is terminated, but some pods are still running", pod.Name) - return nil + glog.V(3).Infof("Pod %q is terminated, but some containers are still running", pod.Name) + return } - if err := m.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil { - glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name) - m.DeletePodStatus(pod.UID) - return nil + if err := m.kubeClient.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err == nil { + glog.V(3).Infof("Pod %q fully terminated and removed from etcd", pod.Name) + m.deletePodStatus(pod.UID) + return } } } - // We failed to update status. In order to make sure we retry next time - // we delete cached value. This may result in an additional update, but - // this is ok. - // Doing this synchronously will lead to a deadlock if the podStatusChannel - // is full, and the pod worker holding the lock is waiting on this method - // to clear the channel. Even if this delete never runs subsequent container - // changes on the node should trigger updates. - go m.DeletePodStatus(pod.UID) - return fmt.Errorf("error updating status for pod %q: %v", kubeletutil.FormatPodName(pod), err) + // We failed to update status, wait for periodic sync to retry. + glog.Warningf("Failed to updated status for pod %q: %v", kubeletutil.FormatPodName(pod), err) +} + +// needsUpdate returns whether the status is stale for the given pod UID. +// This method is not thread safe, and most only be accessed by the sync thread. +func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool { + latest, ok := m.apiStatusVersions[uid] + return !ok || latest < status.version } // notRunning returns true if every status is terminated or waiting, or the status list diff --git a/pkg/kubelet/status/manager_test.go b/pkg/kubelet/status/manager_test.go index c1b75d866a..d6f6286f0e 100644 --- a/pkg/kubelet/status/manager_test.go +++ b/pkg/kubelet/status/manager_test.go @@ -24,9 +24,11 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" + "k8s.io/kubernetes/pkg/runtime" ) var testPod *api.Pod = &api.Pod{ @@ -244,12 +246,15 @@ func TestUnchangedStatusPreservesLastTransitionTime(t *testing.T) { } func TestSyncBatchIgnoresNotFound(t *testing.T) { - syncer := newTestManager() + client := testclient.Fake{} + syncer := NewManager(&client).(*manager) + client.AddReactor("get", "pods", func(action testclient.Action) (bool, runtime.Object, error) { + return true, nil, errors.NewNotFound("pods", "test-pod") + }) + syncer.SetPodStatus(testPod, getRandomPodStatus()) - err := syncer.syncBatch() - if err != nil { - t.Errorf("unexpected syncing error: %v", err) - } + syncer.syncBatch() + verifyActions(t, syncer.kubeClient, []testclient.Action{ testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, }) @@ -259,10 +264,7 @@ func TestSyncBatch(t *testing.T) { syncer := newTestManager() syncer.kubeClient = testclient.NewSimpleFake(testPod) syncer.SetPodStatus(testPod, getRandomPodStatus()) - err := syncer.syncBatch() - if err != nil { - t.Errorf("unexpected syncing error: %v", err) - } + syncer.syncBatch() verifyActions(t, syncer.kubeClient, []testclient.Action{ testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}}, @@ -277,15 +279,121 @@ func TestSyncBatchChecksMismatchedUID(t *testing.T) { differentPod.UID = "second" syncer.kubeClient = testclient.NewSimpleFake(testPod) syncer.SetPodStatus(&differentPod, getRandomPodStatus()) - err := syncer.syncBatch() - if err != nil { - t.Errorf("unexpected syncing error: %v", err) - } + syncer.syncBatch() verifyActions(t, syncer.kubeClient, []testclient.Action{ testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, }) } +func TestSyncBatchNoDeadlock(t *testing.T) { + client := &testclient.Fake{} + m := NewManager(client).(*manager) + + // Setup fake client. + var ret api.Pod + var err error + client.AddReactor("*", "pods", func(action testclient.Action) (bool, runtime.Object, error) { + return true, &ret, err + }) + + pod := new(api.Pod) + *pod = *testPod + pod.Status.ContainerStatuses = []api.ContainerStatus{{State: api.ContainerState{Running: &api.ContainerStateRunning{}}}} + + getAction := testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}} + updateAction := testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}} + + // Pod not found. + ret = *pod + err = errors.NewNotFound("pods", pod.Name) + m.SetPodStatus(pod, getRandomPodStatus()) + m.syncBatch() + verifyActions(t, client, []testclient.Action{getAction}) + client.ClearActions() + + // Pod was recreated. + ret.UID = "other_pod" + err = nil + m.SetPodStatus(pod, getRandomPodStatus()) + m.syncBatch() + verifyActions(t, client, []testclient.Action{getAction}) + client.ClearActions() + + // Pod not deleted (success case). + ret = *pod + m.SetPodStatus(pod, getRandomPodStatus()) + m.syncBatch() + verifyActions(t, client, []testclient.Action{getAction, updateAction}) + client.ClearActions() + + // Pod is terminated, but still running. + pod.DeletionTimestamp = new(unversioned.Time) + m.SetPodStatus(pod, getRandomPodStatus()) + m.syncBatch() + verifyActions(t, client, []testclient.Action{getAction, updateAction}) + client.ClearActions() + + // Pod is terminated successfully. + pod.Status.ContainerStatuses[0].State.Running = nil + pod.Status.ContainerStatuses[0].State.Terminated = &api.ContainerStateTerminated{} + m.SetPodStatus(pod, getRandomPodStatus()) + m.syncBatch() + verifyActions(t, client, []testclient.Action{getAction, updateAction}) + client.ClearActions() + + // Error case. + err = fmt.Errorf("intentional test error") + m.SetPodStatus(pod, getRandomPodStatus()) + m.syncBatch() + verifyActions(t, client, []testclient.Action{getAction}) + client.ClearActions() +} + +func TestStaleUpdates(t *testing.T) { + pod := *testPod + client := testclient.NewSimpleFake(&pod) + m := NewManager(client).(*manager) + + status := api.PodStatus{Message: "initial status"} + m.SetPodStatus(&pod, status) + status.Message = "first version bump" + m.SetPodStatus(&pod, status) + status.Message = "second version bump" + m.SetPodStatus(&pod, status) + verifyUpdates(t, m, 3) + + t.Logf("First sync pushes latest status.") + m.syncBatch() + verifyActions(t, m.kubeClient, []testclient.Action{ + testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, + testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}}, + }) + client.ClearActions() + + for i := 0; i < 2; i++ { + t.Logf("Next 2 syncs should be ignored (%d).", i) + m.syncBatch() + verifyActions(t, m.kubeClient, []testclient.Action{}) + } + + t.Log("Unchanged status should not send an update.") + m.SetPodStatus(&pod, status) + verifyUpdates(t, m, 0) + + t.Log("... unless it's stale.") + m.apiStatusVersions[pod.UID] = m.apiStatusVersions[pod.UID] - 1 + + m.SetPodStatus(&pod, status) + m.syncBatch() + verifyActions(t, m.kubeClient, []testclient.Action{ + testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, + testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}}, + }) + + // Nothing stuck in the pipe. + verifyUpdates(t, m, 0) +} + // shuffle returns a new shuffled list of container statuses. func shuffle(statuses []api.ContainerStatus) []api.ContainerStatus { numStatuses := len(statuses)