Merge pull request #16223 from timstclair/status-deadlock

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-10-29 13:01:31 -07:00
commit c4b32cc8d8
3 changed files with 261 additions and 82 deletions

View File

@ -24,8 +24,10 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/exec"
@ -117,7 +119,7 @@ func TestDoProbe(t *testing.T) {
}
// Clean up.
m.statusManager.DeletePodStatus(podUID)
m.statusManager = status.NewManager(&testclient.Fake{})
resultsManager(m, probeType).Remove(containerID)
}
}

View File

@ -17,10 +17,10 @@ limitations under the License.
package status
import (
"fmt"
"reflect"
"sort"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
@ -33,19 +33,33 @@ import (
"k8s.io/kubernetes/pkg/util"
)
type podStatusSyncRequest struct {
pod *api.Pod
// A wrapper around api.PodStatus that includes a version to enforce that stale pod statuses are
// not sent to the API server.
type versionedPodStatus struct {
status api.PodStatus
// Monotonically increasing version number (per pod).
version uint64
// Pod name & namespace, for sending updates to API server.
podName string
podNamespace string
}
type podStatusSyncRequest struct {
podUID types.UID
status versionedPodStatus
}
// Updates pod statuses in apiserver. Writes only when new status has changed.
// All methods are thread-safe.
type manager struct {
kubeClient client.Interface
// Map from pod full name to sync status of the corresponding pod.
podStatuses map[types.UID]api.PodStatus
// Map from pod UID to sync status of the corresponding pod.
podStatuses map[types.UID]versionedPodStatus
podStatusesLock sync.RWMutex
podStatusChannel chan podStatusSyncRequest
// Map from pod UID to latest status version successfully sent to the API server.
// apiStatusVersions must only be accessed from the sync thread.
apiStatusVersions map[types.UID]uint64
}
// status.Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
@ -66,19 +80,19 @@ type Manager interface {
// return false
TerminatePods(pods []*api.Pod) bool
// DeletePodStatus simply removes the given pod from the status cache.
DeletePodStatus(uid types.UID)
// RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
// the provided podUIDs.
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}
const syncPeriod = 10 * time.Second
func NewManager(kubeClient client.Interface) Manager {
return &manager{
kubeClient: kubeClient,
podStatuses: make(map[types.UID]api.PodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
kubeClient: kubeClient,
podStatuses: make(map[types.UID]versionedPodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
apiStatusVersions: make(map[types.UID]uint64),
}
}
@ -100,21 +114,25 @@ func (m *manager) Start() {
glog.Infof("Kubernetes client is nil, not starting status manager.")
return
}
// syncBatch blocks when no updates are available, we can run it in a tight loop.
glog.Info("Starting to sync pod status with apiserver")
go util.Until(func() {
err := m.syncBatch()
if err != nil {
glog.Warningf("Failed to updated pod status: %v", err)
syncTicker := time.Tick(syncPeriod)
// syncPod and syncBatch share the same go routine to avoid sync races.
go util.Forever(func() {
select {
case syncRequest := <-m.podStatusChannel:
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker:
m.syncBatch()
}
}, 0, util.NeverStop)
}, 0)
}
func (m *manager) GetPodStatus(uid types.UID) (api.PodStatus, bool) {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
status, ok := m.podStatuses[uid]
return status, ok
return status.status, ok
}
func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
@ -123,8 +141,8 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
oldStatus, found := m.podStatuses[pod.UID]
// ensure that the start time does not change across updates.
if found && oldStatus.StartTime != nil {
status.StartTime = oldStatus.StartTime
if found && oldStatus.status.StartTime != nil {
status.StartTime = oldStatus.status.StartTime
}
// Set ReadyCondition.LastTransitionTime.
@ -134,7 +152,7 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
// Need to set LastTransitionTime.
lastTransitionTime := unversioned.Now()
if found {
oldReadyCondition := api.GetPodReadyCondition(oldStatus)
oldReadyCondition := api.GetPodReadyCondition(oldStatus.status)
if oldReadyCondition != nil && readyCondition.Status == oldReadyCondition.Status {
lastTransitionTime = oldReadyCondition.LastTransitionTime
}
@ -158,17 +176,14 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
}
}
// TODO: Holding a lock during blocking operations is dangerous. Refactor so this isn't necessary.
// The intent here is to prevent concurrent updates to a pod's status from
// clobbering each other so the phase of a pod progresses monotonically.
// Currently this routine is not called for the same pod from multiple
// workers and/or the kubelet but dropping the lock before sending the
// status down the channel feels like an easy way to get a bullet in foot.
if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil {
m.podStatuses[pod.UID] = status
m.podStatusChannel <- podStatusSyncRequest{pod, status}
} else {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletutil.FormatPodName(pod), status)
newStatus := m.updateStatusInternal(pod, status)
if newStatus != nil {
select {
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, *newStatus}:
default:
// Let the periodic syncBatch handle the update if the channel is full.
// We can't block, since we hold the mutex lock.
}
}
}
@ -182,17 +197,44 @@ func (m *manager) TerminatePods(pods []*api.Pod) bool {
Terminated: &api.ContainerStateTerminated{},
}
}
select {
case m.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}:
default:
newStatus := m.updateStatusInternal(pod, pod.Status)
if newStatus != nil {
select {
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, *newStatus}:
default:
sent = false
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletutil.FormatPodName(pod))
}
} else {
sent = false
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletutil.FormatPodName(pod))
}
}
return sent
}
func (m *manager) DeletePodStatus(uid types.UID) {
// updateStatusInternal updates the internal status cache, and returns a versioned status if an
// update is necessary. This method IS NOT THREAD SAFE and must be called from a locked function.
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) *versionedPodStatus {
// The intent here is to prevent concurrent updates to a pod's status from
// clobbering each other so the phase of a pod progresses monotonically.
oldStatus, found := m.podStatuses[pod.UID]
if !found || !isStatusEqual(&oldStatus.status, &status) || pod.DeletionTimestamp != nil {
newStatus := versionedPodStatus{
status: status,
version: oldStatus.version + 1,
podName: pod.Name,
podNamespace: pod.Namespace,
}
m.podStatuses[pod.UID] = newStatus
return &newStatus
} else {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletutil.FormatPodName(pod), status)
return nil // No new status.
}
}
// deletePodStatus simply removes the given pod from the status cache.
func (m *manager) deletePodStatus(uid types.UID) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
delete(m.podStatuses, uid)
@ -211,56 +253,83 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
}
// syncBatch syncs pods statuses with the apiserver.
func (m *manager) syncBatch() error {
syncRequest := <-m.podStatusChannel
pod := syncRequest.pod
status := syncRequest.status
func (m *manager) syncBatch() {
var updatedStatuses []podStatusSyncRequest
func() { // Critical section
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
var err error
statusPod := &api.Pod{
ObjectMeta: pod.ObjectMeta,
// Clean up orphaned versions.
for uid := range m.apiStatusVersions {
if _, ok := m.podStatuses[uid]; !ok {
delete(m.apiStatusVersions, uid)
}
}
for uid, status := range m.podStatuses {
if m.needsUpdate(uid, status) {
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
}
}
}()
for _, update := range updatedStatuses {
m.syncPod(update.podUID, update.status)
}
}
// syncPod syncs the given status with the API server. The caller must not hold the lock.
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
if !m.needsUpdate(uid, status) {
glog.Warningf("Status is up-to-date; skipping: %q %+v", uid, status)
return
}
// TODO: make me easier to express from client code
statusPod, err = m.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name)
pod, err := m.kubeClient.Pods(status.podNamespace).Get(status.podName)
if errors.IsNotFound(err) {
glog.V(3).Infof("Pod %q was deleted on the server", pod.Name)
return nil
glog.V(3).Infof("Pod %q was deleted on the server", status.podName)
m.deletePodStatus(uid)
return
}
if err == nil {
if len(pod.UID) > 0 && statusPod.UID != pod.UID {
glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update", kubeletutil.FormatPodName(pod))
return nil
if len(pod.UID) > 0 && pod.UID != uid {
glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update",
kubeletutil.FormatPodName(pod))
m.deletePodStatus(uid)
return
}
statusPod.Status = status
pod.Status = status.status
// TODO: handle conflict as a retry, make that easier too.
statusPod, err = m.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod)
pod, err = m.kubeClient.Pods(pod.Namespace).UpdateStatus(pod)
if err == nil {
glog.V(3).Infof("Status for pod %q updated successfully", kubeletutil.FormatPodName(pod))
m.apiStatusVersions[uid] = status.version
if pod.DeletionTimestamp == nil {
return nil
return
}
if !notRunning(pod.Status.ContainerStatuses) {
glog.V(3).Infof("Pod %q is terminated, but some pods are still running", pod.Name)
return nil
glog.V(3).Infof("Pod %q is terminated, but some containers are still running", pod.Name)
return
}
if err := m.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil {
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name)
m.DeletePodStatus(pod.UID)
return nil
if err := m.kubeClient.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err == nil {
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", pod.Name)
m.deletePodStatus(pod.UID)
return
}
}
}
// We failed to update status. In order to make sure we retry next time
// we delete cached value. This may result in an additional update, but
// this is ok.
// Doing this synchronously will lead to a deadlock if the podStatusChannel
// is full, and the pod worker holding the lock is waiting on this method
// to clear the channel. Even if this delete never runs subsequent container
// changes on the node should trigger updates.
go m.DeletePodStatus(pod.UID)
return fmt.Errorf("error updating status for pod %q: %v", kubeletutil.FormatPodName(pod), err)
// We failed to update status, wait for periodic sync to retry.
glog.Warningf("Failed to updated status for pod %q: %v", kubeletutil.FormatPodName(pod), err)
}
// needsUpdate returns whether the status is stale for the given pod UID.
// This method is not thread safe, and most only be accessed by the sync thread.
func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
latest, ok := m.apiStatusVersions[uid]
return !ok || latest < status.version
}
// notRunning returns true if every status is terminated or waiting, or the status list

View File

@ -24,9 +24,11 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/runtime"
)
var testPod *api.Pod = &api.Pod{
@ -244,12 +246,15 @@ func TestUnchangedStatusPreservesLastTransitionTime(t *testing.T) {
}
func TestSyncBatchIgnoresNotFound(t *testing.T) {
syncer := newTestManager()
client := testclient.Fake{}
syncer := NewManager(&client).(*manager)
client.AddReactor("get", "pods", func(action testclient.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewNotFound("pods", "test-pod")
})
syncer.SetPodStatus(testPod, getRandomPodStatus())
err := syncer.syncBatch()
if err != nil {
t.Errorf("unexpected syncing error: %v", err)
}
syncer.syncBatch()
verifyActions(t, syncer.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
})
@ -259,10 +264,7 @@ func TestSyncBatch(t *testing.T) {
syncer := newTestManager()
syncer.kubeClient = testclient.NewSimpleFake(testPod)
syncer.SetPodStatus(testPod, getRandomPodStatus())
err := syncer.syncBatch()
if err != nil {
t.Errorf("unexpected syncing error: %v", err)
}
syncer.syncBatch()
verifyActions(t, syncer.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
@ -277,15 +279,121 @@ func TestSyncBatchChecksMismatchedUID(t *testing.T) {
differentPod.UID = "second"
syncer.kubeClient = testclient.NewSimpleFake(testPod)
syncer.SetPodStatus(&differentPod, getRandomPodStatus())
err := syncer.syncBatch()
if err != nil {
t.Errorf("unexpected syncing error: %v", err)
}
syncer.syncBatch()
verifyActions(t, syncer.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
})
}
func TestSyncBatchNoDeadlock(t *testing.T) {
client := &testclient.Fake{}
m := NewManager(client).(*manager)
// Setup fake client.
var ret api.Pod
var err error
client.AddReactor("*", "pods", func(action testclient.Action) (bool, runtime.Object, error) {
return true, &ret, err
})
pod := new(api.Pod)
*pod = *testPod
pod.Status.ContainerStatuses = []api.ContainerStatus{{State: api.ContainerState{Running: &api.ContainerStateRunning{}}}}
getAction := testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}
updateAction := testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}}
// Pod not found.
ret = *pod
err = errors.NewNotFound("pods", pod.Name)
m.SetPodStatus(pod, getRandomPodStatus())
m.syncBatch()
verifyActions(t, client, []testclient.Action{getAction})
client.ClearActions()
// Pod was recreated.
ret.UID = "other_pod"
err = nil
m.SetPodStatus(pod, getRandomPodStatus())
m.syncBatch()
verifyActions(t, client, []testclient.Action{getAction})
client.ClearActions()
// Pod not deleted (success case).
ret = *pod
m.SetPodStatus(pod, getRandomPodStatus())
m.syncBatch()
verifyActions(t, client, []testclient.Action{getAction, updateAction})
client.ClearActions()
// Pod is terminated, but still running.
pod.DeletionTimestamp = new(unversioned.Time)
m.SetPodStatus(pod, getRandomPodStatus())
m.syncBatch()
verifyActions(t, client, []testclient.Action{getAction, updateAction})
client.ClearActions()
// Pod is terminated successfully.
pod.Status.ContainerStatuses[0].State.Running = nil
pod.Status.ContainerStatuses[0].State.Terminated = &api.ContainerStateTerminated{}
m.SetPodStatus(pod, getRandomPodStatus())
m.syncBatch()
verifyActions(t, client, []testclient.Action{getAction, updateAction})
client.ClearActions()
// Error case.
err = fmt.Errorf("intentional test error")
m.SetPodStatus(pod, getRandomPodStatus())
m.syncBatch()
verifyActions(t, client, []testclient.Action{getAction})
client.ClearActions()
}
func TestStaleUpdates(t *testing.T) {
pod := *testPod
client := testclient.NewSimpleFake(&pod)
m := NewManager(client).(*manager)
status := api.PodStatus{Message: "initial status"}
m.SetPodStatus(&pod, status)
status.Message = "first version bump"
m.SetPodStatus(&pod, status)
status.Message = "second version bump"
m.SetPodStatus(&pod, status)
verifyUpdates(t, m, 3)
t.Logf("First sync pushes latest status.")
m.syncBatch()
verifyActions(t, m.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
})
client.ClearActions()
for i := 0; i < 2; i++ {
t.Logf("Next 2 syncs should be ignored (%d).", i)
m.syncBatch()
verifyActions(t, m.kubeClient, []testclient.Action{})
}
t.Log("Unchanged status should not send an update.")
m.SetPodStatus(&pod, status)
verifyUpdates(t, m, 0)
t.Log("... unless it's stale.")
m.apiStatusVersions[pod.UID] = m.apiStatusVersions[pod.UID] - 1
m.SetPodStatus(&pod, status)
m.syncBatch()
verifyActions(t, m.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
})
// Nothing stuck in the pipe.
verifyUpdates(t, m, 0)
}
// shuffle returns a new shuffled list of container statuses.
func shuffle(statuses []api.ContainerStatus) []api.ContainerStatus {
numStatuses := len(statuses)