Merge pull request #17755 from timstclair/status-manager

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-12-02 22:07:44 -08:00
commit 106cf2b6b5
3 changed files with 48 additions and 6 deletions

View File

@ -130,6 +130,9 @@ func (pm *basicManager) updatePodsInternal(pods ...*api.Pod) {
} else {
pm.podByUID[pod.UID] = pod
pm.podByFullName[podFullName] = pod
if mirror, ok := pm.mirrorPodByFullName[podFullName]; ok {
pm.translationByUID[mirror.UID] = pod.UID
}
}
}
}
@ -218,8 +221,8 @@ func (pm *basicManager) GetUIDTranslations() (podToMirror, mirrorToPod map[types
podToMirror = make(map[types.UID]types.UID, len(pm.translationByUID))
mirrorToPod = make(map[types.UID]types.UID, len(pm.translationByUID))
for k, v := range pm.translationByUID {
podToMirror[k] = v
mirrorToPod[v] = k
mirrorToPod[k] = v
podToMirror[v] = k
}
return podToMirror, mirrorToPod
}

View File

@ -60,7 +60,7 @@ type manager struct {
podStatuses map[types.UID]versionedPodStatus
podStatusesLock sync.RWMutex
podStatusChannel chan podStatusSyncRequest
// Map from pod UID to latest status version successfully sent to the API server.
// Map from (mirror) pod UID to latest status version successfully sent to the API server.
// apiStatusVersions must only be accessed from the sync thread.
apiStatusVersions map[types.UID]uint64
}
@ -296,7 +296,7 @@ func (m *manager) syncBatch() {
// Clean up orphaned versions.
for uid := range m.apiStatusVersions {
_, hasPod := m.podStatuses[uid]
_, hasMirror := podToMirror[uid]
_, hasMirror := mirrorToPod[uid]
if !hasPod && !hasMirror {
delete(m.apiStatusVersions, uid)
}
@ -304,8 +304,8 @@ func (m *manager) syncBatch() {
for uid, status := range m.podStatuses {
syncedUID := uid
if translated, ok := mirrorToPod[uid]; ok {
syncedUID = translated
if mirrorUID, ok := podToMirror[uid]; ok {
syncedUID = mirrorUID
}
if m.needsUpdate(syncedUID, status) {
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})

View File

@ -538,3 +538,42 @@ func TestSetContainerReadiness(t *testing.T) {
m.SetContainerReadiness(testPod, kubecontainer.ContainerID{"test", "foo"}, true)
verifyUpdates(t, m, 0)
}
func TestSyncBatchCleanupVersions(t *testing.T) {
m := newTestManager(&testclient.Fake{})
mirrorPod := *testPod
mirrorPod.UID = "mirror-uid"
mirrorPod.Name = "mirror_pod"
mirrorPod.Annotations = map[string]string{
kubetypes.ConfigSourceAnnotationKey: "api",
kubetypes.ConfigMirrorAnnotationKey: "mirror",
}
// Orphaned pods should be removed.
m.apiStatusVersions[testPod.UID] = 100
m.apiStatusVersions[mirrorPod.UID] = 200
m.syncBatch()
if _, ok := m.apiStatusVersions[testPod.UID]; ok {
t.Errorf("Should have cleared status for testPod")
}
if _, ok := m.apiStatusVersions[mirrorPod.UID]; ok {
t.Errorf("Should have cleared status for mirrorPod")
}
// Non-orphaned pods should not be removed.
m.SetPodStatus(testPod, getRandomPodStatus())
m.podManager.AddPod(&mirrorPod)
staticPod := mirrorPod
staticPod.UID = "static-uid"
staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"}
m.podManager.AddPod(&staticPod)
m.apiStatusVersions[testPod.UID] = 100
m.apiStatusVersions[mirrorPod.UID] = 200
m.syncBatch()
if _, ok := m.apiStatusVersions[testPod.UID]; !ok {
t.Errorf("Should not have cleared status for testPod")
}
if _, ok := m.apiStatusVersions[mirrorPod.UID]; !ok {
t.Errorf("Should not have cleared status for mirrorPod")
}
}