mirror of https://github.com/k3s-io/k3s
Merge pull request #18410 from Random-Liu/reconcile-pod-status
Auto commit by PR queue botpull/6/head
commit
67ac4e3838
|
@ -46,7 +46,7 @@ const (
|
||||||
// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE message whenever pods are
|
// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE message whenever pods are
|
||||||
// changed, and a SET message if there are any additions or removals.
|
// changed, and a SET message if there are any additions or removals.
|
||||||
PodConfigNotificationSnapshotAndUpdates
|
PodConfigNotificationSnapshotAndUpdates
|
||||||
// PodConfigNotificationIncremental delivers ADD, UPDATE, and REMOVE to the update channel.
|
// PodConfigNotificationIncremental delivers ADD, UPDATE, REMOVE, RECONCILE to the update channel.
|
||||||
PodConfigNotificationIncremental
|
PodConfigNotificationIncremental
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -152,7 +152,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
||||||
defer s.updateLock.Unlock()
|
defer s.updateLock.Unlock()
|
||||||
|
|
||||||
seenBefore := s.sourcesSeen.Has(source)
|
seenBefore := s.sourcesSeen.Has(source)
|
||||||
adds, updates, deletes := s.merge(source, change)
|
adds, updates, deletes, reconciles := s.merge(source, change)
|
||||||
firstSet := !seenBefore && s.sourcesSeen.Has(source)
|
firstSet := !seenBefore && s.sourcesSeen.Has(source)
|
||||||
|
|
||||||
// deliver update notifications
|
// deliver update notifications
|
||||||
|
@ -167,6 +167,10 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
||||||
if len(updates.Pods) > 0 {
|
if len(updates.Pods) > 0 {
|
||||||
s.updates <- *updates
|
s.updates <- *updates
|
||||||
}
|
}
|
||||||
|
// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
|
||||||
|
if len(reconciles.Pods) > 0 {
|
||||||
|
s.updates <- *reconciles
|
||||||
|
}
|
||||||
|
|
||||||
case PodConfigNotificationSnapshotAndUpdates:
|
case PodConfigNotificationSnapshotAndUpdates:
|
||||||
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
|
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
|
||||||
|
@ -190,13 +194,14 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes *kubetypes.PodUpdate) {
|
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, reconciles *kubetypes.PodUpdate) {
|
||||||
s.podLock.Lock()
|
s.podLock.Lock()
|
||||||
defer s.podLock.Unlock()
|
defer s.podLock.Unlock()
|
||||||
|
|
||||||
addPods := []*api.Pod{}
|
addPods := []*api.Pod{}
|
||||||
updatePods := []*api.Pod{}
|
updatePods := []*api.Pod{}
|
||||||
deletePods := []*api.Pod{}
|
deletePods := []*api.Pod{}
|
||||||
|
reconcilePods := []*api.Pod{}
|
||||||
|
|
||||||
pods := s.pods[source]
|
pods := s.pods[source]
|
||||||
if pods == nil {
|
if pods == nil {
|
||||||
|
@ -221,12 +226,12 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
||||||
}
|
}
|
||||||
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
|
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
|
||||||
if existing, found := pods[name]; found {
|
if existing, found := pods[name]; found {
|
||||||
if checkAndUpdatePod(existing, ref) {
|
needUpdate, needReconcile := checkAndUpdatePod(existing, ref)
|
||||||
// this is an update
|
if needUpdate {
|
||||||
updatePods = append(updatePods, existing)
|
updatePods = append(updatePods, existing)
|
||||||
continue
|
} else if needReconcile {
|
||||||
|
reconcilePods = append(reconcilePods, existing)
|
||||||
}
|
}
|
||||||
// this is a no-op
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
recordFirstSeenTime(ref)
|
recordFirstSeenTime(ref)
|
||||||
|
@ -273,12 +278,12 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
||||||
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
|
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
|
||||||
if existing, found := oldPods[name]; found {
|
if existing, found := oldPods[name]; found {
|
||||||
pods[name] = existing
|
pods[name] = existing
|
||||||
if checkAndUpdatePod(existing, ref) {
|
needUpdate, needReconcile := checkAndUpdatePod(existing, ref)
|
||||||
// this is an update
|
if needUpdate {
|
||||||
updatePods = append(updatePods, existing)
|
updatePods = append(updatePods, existing)
|
||||||
continue
|
} else if needReconcile {
|
||||||
|
reconcilePods = append(reconcilePods, existing)
|
||||||
}
|
}
|
||||||
// this is a no-op
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
recordFirstSeenTime(ref)
|
recordFirstSeenTime(ref)
|
||||||
|
@ -312,8 +317,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
||||||
adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
|
adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
|
||||||
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
|
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
|
||||||
deletes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(deletePods), Source: source}
|
deletes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(deletePods), Source: source}
|
||||||
|
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
|
||||||
|
|
||||||
return adds, updates, deletes
|
return adds, updates, deletes, reconciles
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *podStorage) markSourceSet(source string) {
|
func (s *podStorage) markSourceSet(source string) {
|
||||||
|
@ -433,13 +439,25 @@ func podsDifferSemantically(existing, ref *api.Pod) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkAndUpdatePod updates existing if ref makes a meaningful change and returns true, or
|
// checkAndUpdatePod updates existing, and:
|
||||||
// returns false if there was no update.
|
// * if ref makes a meaningful change, returns needUpdate=true
|
||||||
func checkAndUpdatePod(existing, ref *api.Pod) bool {
|
// * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
|
||||||
|
// * else return both false
|
||||||
|
// Now, needUpdate and needReconcile should never be both true
|
||||||
|
func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool) {
|
||||||
// TODO: it would be better to update the whole object and only preserve certain things
|
// TODO: it would be better to update the whole object and only preserve certain things
|
||||||
// like the source annotation or the UID (to ensure safety)
|
// like the source annotation or the UID (to ensure safety)
|
||||||
if !podsDifferSemantically(existing, ref) {
|
if !podsDifferSemantically(existing, ref) {
|
||||||
return false
|
// this is not an update
|
||||||
|
// Only check reconcile when it is not an update, because if the pod is going to
|
||||||
|
// be updated, an extra reconcile is unnecessary
|
||||||
|
if !reflect.DeepEqual(existing.Status, ref.Status) {
|
||||||
|
// Pod with changed pod status needs reconcile, because kubelet should
|
||||||
|
// be the source of truth of pod status.
|
||||||
|
existing.Status = ref.Status
|
||||||
|
needReconcile = true
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
// this is an update
|
// this is an update
|
||||||
|
|
||||||
|
@ -451,8 +469,10 @@ func checkAndUpdatePod(existing, ref *api.Pod) bool {
|
||||||
existing.Labels = ref.Labels
|
existing.Labels = ref.Labels
|
||||||
existing.DeletionTimestamp = ref.DeletionTimestamp
|
existing.DeletionTimestamp = ref.DeletionTimestamp
|
||||||
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
|
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
|
||||||
|
existing.Status = ref.Status
|
||||||
updateAnnotations(existing, ref)
|
updateAnnotations(existing, ref)
|
||||||
return true
|
needUpdate = true
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync sends a copy of the current state through the update channel.
|
// Sync sends a copy of the current state through the update channel.
|
||||||
|
|
|
@ -17,7 +17,10 @@ limitations under the License.
|
||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
@ -108,7 +111,7 @@ func expectPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate, expected ...ku
|
||||||
// Compare pods one by one. This is necessary beacuse we don't want to
|
// Compare pods one by one. This is necessary beacuse we don't want to
|
||||||
// compare local annotations.
|
// compare local annotations.
|
||||||
for j := range expected[i].Pods {
|
for j := range expected[i].Pods {
|
||||||
if podsDifferSemantically(expected[i].Pods[j], update.Pods[j]) {
|
if podsDifferSemantically(expected[i].Pods[j], update.Pods[j]) || !reflect.DeepEqual(expected[i].Pods[j].Status, update.Pods[j].Status) {
|
||||||
t.Fatalf("Expected %#v, Got %#v", expected[i].Pods[j], update.Pods[j])
|
t.Fatalf("Expected %#v, Got %#v", expected[i].Pods[j], update.Pods[j])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -270,6 +273,51 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
|
||||||
CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
|
CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewPodAddedSetReconciled(t *testing.T) {
|
||||||
|
// Create and touch new test pods, return the new pods and touched pod. We should create new pod list
|
||||||
|
// before touching to avoid data race.
|
||||||
|
newTestPods := func(touchStatus, touchSpec bool) ([]*api.Pod, *api.Pod) {
|
||||||
|
pods := []*api.Pod{
|
||||||
|
CreateValidPod("changable-pod-0", "new"),
|
||||||
|
CreateValidPod("constant-pod-1", "new"),
|
||||||
|
CreateValidPod("constant-pod-2", "new"),
|
||||||
|
}
|
||||||
|
if touchStatus {
|
||||||
|
pods[0].Status = api.PodStatus{Message: strconv.Itoa(rand.Int())}
|
||||||
|
}
|
||||||
|
if touchSpec {
|
||||||
|
pods[0].Spec.Containers[0].Name = strconv.Itoa(rand.Int())
|
||||||
|
}
|
||||||
|
return pods, pods[0]
|
||||||
|
}
|
||||||
|
for _, op := range []kubetypes.PodOperation{
|
||||||
|
kubetypes.ADD,
|
||||||
|
kubetypes.SET,
|
||||||
|
} {
|
||||||
|
var podWithStatusChange *api.Pod
|
||||||
|
pods, _ := newTestPods(false, false)
|
||||||
|
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
|
||||||
|
|
||||||
|
// Use SET to initialize the config, especially initialize the source set
|
||||||
|
channel <- CreatePodUpdate(kubetypes.SET, TestSource, pods...)
|
||||||
|
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, pods...))
|
||||||
|
|
||||||
|
// If status is not changed, no reconcile should be triggered
|
||||||
|
channel <- CreatePodUpdate(op, TestSource, pods...)
|
||||||
|
expectNoPodUpdate(t, ch)
|
||||||
|
|
||||||
|
// If the pod status is changed and not updated, a reconcile should be triggered
|
||||||
|
pods, podWithStatusChange = newTestPods(true, false)
|
||||||
|
channel <- CreatePodUpdate(op, TestSource, pods...)
|
||||||
|
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RECONCILE, TestSource, podWithStatusChange))
|
||||||
|
|
||||||
|
// If the pod status is changed, but the pod is also updated, no reconcile should be triggered
|
||||||
|
pods, podWithStatusChange = newTestPods(true, true)
|
||||||
|
channel <- CreatePodUpdate(op, TestSource, pods...)
|
||||||
|
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, podWithStatusChange))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestInitialEmptySet(t *testing.T) {
|
func TestInitialEmptySet(t *testing.T) {
|
||||||
for _, test := range []struct {
|
for _, test := range []struct {
|
||||||
mode PodConfigNotificationMode
|
mode PodConfigNotificationMode
|
||||||
|
@ -327,7 +375,7 @@ func TestPodUpdateAnnotations(t *testing.T) {
|
||||||
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
|
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPodUpdateLables(t *testing.T) {
|
func TestPodUpdateLabels(t *testing.T) {
|
||||||
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
|
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
|
||||||
|
|
||||||
pod := CreateValidPod("foo2", "new")
|
pod := CreateValidPod("foo2", "new")
|
||||||
|
|
|
@ -130,6 +130,7 @@ type SyncHandler interface {
|
||||||
HandlePodAdditions(pods []*api.Pod)
|
HandlePodAdditions(pods []*api.Pod)
|
||||||
HandlePodUpdates(pods []*api.Pod)
|
HandlePodUpdates(pods []*api.Pod)
|
||||||
HandlePodDeletions(pods []*api.Pod)
|
HandlePodDeletions(pods []*api.Pod)
|
||||||
|
HandlePodReconcile(pods []*api.Pod)
|
||||||
HandlePodSyncs(pods []*api.Pod)
|
HandlePodSyncs(pods []*api.Pod)
|
||||||
HandlePodCleanups() error
|
HandlePodCleanups() error
|
||||||
}
|
}
|
||||||
|
@ -2335,6 +2336,9 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
|
||||||
case kubetypes.REMOVE:
|
case kubetypes.REMOVE:
|
||||||
glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
|
glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
|
||||||
handler.HandlePodDeletions(u.Pods)
|
handler.HandlePodDeletions(u.Pods)
|
||||||
|
case kubetypes.RECONCILE:
|
||||||
|
glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
|
||||||
|
handler.HandlePodReconcile(u.Pods)
|
||||||
case kubetypes.SET:
|
case kubetypes.SET:
|
||||||
// TODO: Do we want to support this?
|
// TODO: Do we want to support this?
|
||||||
glog.Errorf("Kubelet does not support snapshot update")
|
glog.Errorf("Kubelet does not support snapshot update")
|
||||||
|
@ -2469,6 +2473,14 @@ func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) HandlePodReconcile(pods []*api.Pod) {
|
||||||
|
for _, pod := range pods {
|
||||||
|
// Update the pod in pod manager, status manager will do periodically reconcile according
|
||||||
|
// to the pod manager.
|
||||||
|
kl.podManager.UpdatePod(pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) {
|
func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) {
|
||||||
start := kl.clock.Now()
|
start := kl.clock.Now()
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
|
|
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||||
package status
|
package status
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"reflect"
|
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -105,13 +104,12 @@ func NewManager(kubeClient client.Interface, podManager kubepod.Manager) Manager
|
||||||
}
|
}
|
||||||
|
|
||||||
// isStatusEqual returns true if the given pod statuses are equal, false otherwise.
|
// isStatusEqual returns true if the given pod statuses are equal, false otherwise.
|
||||||
// This method sorts container statuses so order does not affect equality.
|
// This method normalizes the status before comparing so as to make sure that meaningless
|
||||||
|
// changes will be ignored.
|
||||||
func isStatusEqual(oldStatus, status *api.PodStatus) bool {
|
func isStatusEqual(oldStatus, status *api.PodStatus) bool {
|
||||||
sort.Sort(kubetypes.SortedContainerStatuses(status.ContainerStatuses))
|
normalizeStatus(oldStatus)
|
||||||
sort.Sort(kubetypes.SortedContainerStatuses(oldStatus.ContainerStatuses))
|
normalizeStatus(status)
|
||||||
|
return api.Semantic.DeepEqual(status, oldStatus)
|
||||||
// TODO: More sophisticated equality checking.
|
|
||||||
return reflect.DeepEqual(status, oldStatus)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *manager) Start() {
|
func (m *manager) Start() {
|
||||||
|
@ -329,6 +327,13 @@ func (m *manager) syncBatch() {
|
||||||
}
|
}
|
||||||
if m.needsUpdate(syncedUID, status) {
|
if m.needsUpdate(syncedUID, status) {
|
||||||
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
|
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
|
||||||
|
} else if m.needsReconcile(uid, status.status) {
|
||||||
|
// Delete the apiStatusVersions here to force an update on the pod status
|
||||||
|
// In most cases the deleted apiStatusVersions here should be filled
|
||||||
|
// soon after the following syncPod() [If the syncPod() sync an update
|
||||||
|
// successfully].
|
||||||
|
delete(m.apiStatusVersions, syncedUID)
|
||||||
|
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -392,6 +397,82 @@ func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
|
||||||
return !ok || latest < status.version
|
return !ok || latest < status.version
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// needsReconcile compares the given status with the status in the pod manager (which
|
||||||
|
// in fact comes from apiserver), returns whether the status needs to be reconciled with
|
||||||
|
// the apiserver. Now when pod status is inconsistent between apiserver and kubelet,
|
||||||
|
// kubelet should forcibly send an update to reconclie the inconsistence, because kubelet
|
||||||
|
// should be the source of truth of pod status.
|
||||||
|
// NOTE(random-liu): It's simpler to pass in mirror pod uid and get mirror pod by uid, but
|
||||||
|
// now the pod manager only supports getting mirror pod by static pod, so we have to pass
|
||||||
|
// static pod uid here.
|
||||||
|
// TODO(random-liu): Simplify the logic when mirror pod manager is added.
|
||||||
|
func (m *manager) needsReconcile(uid types.UID, status api.PodStatus) bool {
|
||||||
|
// The pod could be a static pod, so we should translate first.
|
||||||
|
pod, ok := m.podManager.GetPodByUID(uid)
|
||||||
|
if !ok {
|
||||||
|
// Although we get uid from pod manager in syncBatch, it still could be deleted before here.
|
||||||
|
glog.V(4).Infof("Pod %q has been deleted, no need to reconcile", format.Pod(pod))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// If the pod is a static pod, we should check its mirror pod, because only status in mirror pod is meaningful to us.
|
||||||
|
if kubepod.IsStaticPod(pod) {
|
||||||
|
mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod)
|
||||||
|
if !ok {
|
||||||
|
glog.V(4).Infof("Static pod %q has no corresponding mirror pod, no need to reconcile", format.Pod(pod))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
pod = mirrorPod
|
||||||
|
}
|
||||||
|
|
||||||
|
if isStatusEqual(&pod.Status, &status) {
|
||||||
|
// If the status from the source is the same with the cached status,
|
||||||
|
// reconcile is not needed. Just return.
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
glog.V(3).Infof("Pod status is inconsistent with cached status, a reconciliation should be triggered:\n %+v", util.ObjectDiff(pod.Status, status))
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// We add this function, because apiserver only supports *RFC3339* now, which means that the timestamp returned by
|
||||||
|
// apiserver has no nanosecond infromation. However, the timestamp returned by unversioned.Now() contains nanosecond,
|
||||||
|
// so when we do comparison between status from apiserver and cached status, isStatusEqual() will always return false.
|
||||||
|
// There is related issue #15262 and PR #15263 about this.
|
||||||
|
// In fact, the best way to solve this is to do it on api side. However for now, we normalize the status locally in
|
||||||
|
// kubelet temporarily.
|
||||||
|
// TODO(random-liu): Remove timestamp related logic after apiserver supports nanosecond or makes it consistent.
|
||||||
|
func normalizeStatus(status *api.PodStatus) *api.PodStatus {
|
||||||
|
normalizeTimeStamp := func(t *unversioned.Time) {
|
||||||
|
*t = t.Rfc3339Copy()
|
||||||
|
}
|
||||||
|
normalizeContainerState := func(c *api.ContainerState) {
|
||||||
|
if c.Running != nil {
|
||||||
|
normalizeTimeStamp(&c.Running.StartedAt)
|
||||||
|
}
|
||||||
|
if c.Terminated != nil {
|
||||||
|
normalizeTimeStamp(&c.Terminated.StartedAt)
|
||||||
|
normalizeTimeStamp(&c.Terminated.FinishedAt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if status.StartTime != nil {
|
||||||
|
normalizeTimeStamp(status.StartTime)
|
||||||
|
}
|
||||||
|
for i := range status.Conditions {
|
||||||
|
condition := &status.Conditions[i]
|
||||||
|
normalizeTimeStamp(&condition.LastProbeTime)
|
||||||
|
normalizeTimeStamp(&condition.LastTransitionTime)
|
||||||
|
}
|
||||||
|
for i := range status.ContainerStatuses {
|
||||||
|
cstatus := &status.ContainerStatuses[i]
|
||||||
|
normalizeContainerState(&cstatus.State)
|
||||||
|
normalizeContainerState(&cstatus.LastTerminationState)
|
||||||
|
}
|
||||||
|
// Sort the container statuses, so that the order won't affect the result of comparison
|
||||||
|
sort.Sort(kubetypes.SortedContainerStatuses(status.ContainerStatuses))
|
||||||
|
return status
|
||||||
|
}
|
||||||
|
|
||||||
// notRunning returns true if every status is terminated or waiting, or the status list
|
// notRunning returns true if every status is terminated or waiting, or the status list
|
||||||
// is empty.
|
// is empty.
|
||||||
func notRunning(statuses []api.ContainerStatus) bool {
|
func notRunning(statuses []api.ContainerStatus) bool {
|
||||||
|
|
|
@ -44,6 +44,24 @@ var testPod *api.Pod = &api.Pod{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// After adding reconciliation, if status in pod manager is different from the cached status, a reconciliation
|
||||||
|
// will be triggered, which will mess up all the old unit test.
|
||||||
|
// To simplify the implementation of unit test, we add testSyncBatch() here, it will make sure the statuses in
|
||||||
|
// pod manager the same with cached ones before syncBatch() so as to avoid reconciling.
|
||||||
|
func (m *manager) testSyncBatch() {
|
||||||
|
for uid, status := range m.podStatuses {
|
||||||
|
pod, ok := m.podManager.GetPodByUID(uid)
|
||||||
|
if ok {
|
||||||
|
pod.Status = status.status
|
||||||
|
}
|
||||||
|
pod, ok = m.podManager.GetMirrorPodByPod(pod)
|
||||||
|
if ok {
|
||||||
|
pod.Status = status.status
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.syncBatch()
|
||||||
|
}
|
||||||
|
|
||||||
func newTestManager(kubeClient client.Interface) *manager {
|
func newTestManager(kubeClient client.Interface) *manager {
|
||||||
podManager := kubepod.NewBasicPodManager(kubepod.NewFakeMirrorClient())
|
podManager := kubepod.NewBasicPodManager(kubepod.NewFakeMirrorClient())
|
||||||
podManager.AddPod(testPod)
|
podManager.AddPod(testPod)
|
||||||
|
@ -209,8 +227,8 @@ func TestChangedStatusUpdatesLastTransitionTime(t *testing.T) {
|
||||||
if newReadyCondition.LastTransitionTime.IsZero() {
|
if newReadyCondition.LastTransitionTime.IsZero() {
|
||||||
t.Errorf("Unexpected: last transition time not set")
|
t.Errorf("Unexpected: last transition time not set")
|
||||||
}
|
}
|
||||||
if !oldReadyCondition.LastTransitionTime.Before(newReadyCondition.LastTransitionTime) {
|
if newReadyCondition.LastTransitionTime.Before(oldReadyCondition.LastTransitionTime) {
|
||||||
t.Errorf("Unexpected: new transition time %s, is not after old transition time %s", newReadyCondition.LastTransitionTime, oldReadyCondition.LastTransitionTime)
|
t.Errorf("Unexpected: new transition time %s, is before old transition time %s", newReadyCondition.LastTransitionTime, oldReadyCondition.LastTransitionTime)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,7 +277,7 @@ func TestSyncBatchIgnoresNotFound(t *testing.T) {
|
||||||
return true, nil, errors.NewNotFound(api.Resource("pods"), "test-pod")
|
return true, nil, errors.NewNotFound(api.Resource("pods"), "test-pod")
|
||||||
})
|
})
|
||||||
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
||||||
syncer.syncBatch()
|
syncer.testSyncBatch()
|
||||||
|
|
||||||
verifyActions(t, syncer.kubeClient, []testclient.Action{
|
verifyActions(t, syncer.kubeClient, []testclient.Action{
|
||||||
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
||||||
|
@ -270,7 +288,7 @@ func TestSyncBatch(t *testing.T) {
|
||||||
syncer := newTestManager(&testclient.Fake{})
|
syncer := newTestManager(&testclient.Fake{})
|
||||||
syncer.kubeClient = testclient.NewSimpleFake(testPod)
|
syncer.kubeClient = testclient.NewSimpleFake(testPod)
|
||||||
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
||||||
syncer.syncBatch()
|
syncer.testSyncBatch()
|
||||||
verifyActions(t, syncer.kubeClient, []testclient.Action{
|
verifyActions(t, syncer.kubeClient, []testclient.Action{
|
||||||
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
||||||
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
|
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
|
||||||
|
@ -288,7 +306,7 @@ func TestSyncBatchChecksMismatchedUID(t *testing.T) {
|
||||||
syncer.podManager.AddPod(&differentPod)
|
syncer.podManager.AddPod(&differentPod)
|
||||||
syncer.kubeClient = testclient.NewSimpleFake(&pod)
|
syncer.kubeClient = testclient.NewSimpleFake(&pod)
|
||||||
syncer.SetPodStatus(&differentPod, getRandomPodStatus())
|
syncer.SetPodStatus(&differentPod, getRandomPodStatus())
|
||||||
syncer.syncBatch()
|
syncer.testSyncBatch()
|
||||||
verifyActions(t, syncer.kubeClient, []testclient.Action{
|
verifyActions(t, syncer.kubeClient, []testclient.Action{
|
||||||
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
||||||
})
|
})
|
||||||
|
@ -324,7 +342,7 @@ func TestSyncBatchNoDeadlock(t *testing.T) {
|
||||||
ret = *pod
|
ret = *pod
|
||||||
err = errors.NewNotFound(api.Resource("pods"), pod.Name)
|
err = errors.NewNotFound(api.Resource("pods"), pod.Name)
|
||||||
m.SetPodStatus(pod, getRandomPodStatus())
|
m.SetPodStatus(pod, getRandomPodStatus())
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
verifyActions(t, client, []testclient.Action{getAction})
|
verifyActions(t, client, []testclient.Action{getAction})
|
||||||
client.ClearActions()
|
client.ClearActions()
|
||||||
|
|
||||||
|
@ -332,21 +350,21 @@ func TestSyncBatchNoDeadlock(t *testing.T) {
|
||||||
ret.UID = "other_pod"
|
ret.UID = "other_pod"
|
||||||
err = nil
|
err = nil
|
||||||
m.SetPodStatus(pod, getRandomPodStatus())
|
m.SetPodStatus(pod, getRandomPodStatus())
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
verifyActions(t, client, []testclient.Action{getAction})
|
verifyActions(t, client, []testclient.Action{getAction})
|
||||||
client.ClearActions()
|
client.ClearActions()
|
||||||
|
|
||||||
// Pod not deleted (success case).
|
// Pod not deleted (success case).
|
||||||
ret = *pod
|
ret = *pod
|
||||||
m.SetPodStatus(pod, getRandomPodStatus())
|
m.SetPodStatus(pod, getRandomPodStatus())
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
verifyActions(t, client, []testclient.Action{getAction, updateAction})
|
verifyActions(t, client, []testclient.Action{getAction, updateAction})
|
||||||
client.ClearActions()
|
client.ClearActions()
|
||||||
|
|
||||||
// Pod is terminated, but still running.
|
// Pod is terminated, but still running.
|
||||||
pod.DeletionTimestamp = new(unversioned.Time)
|
pod.DeletionTimestamp = new(unversioned.Time)
|
||||||
m.SetPodStatus(pod, getRandomPodStatus())
|
m.SetPodStatus(pod, getRandomPodStatus())
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
verifyActions(t, client, []testclient.Action{getAction, updateAction})
|
verifyActions(t, client, []testclient.Action{getAction, updateAction})
|
||||||
client.ClearActions()
|
client.ClearActions()
|
||||||
|
|
||||||
|
@ -354,14 +372,14 @@ func TestSyncBatchNoDeadlock(t *testing.T) {
|
||||||
pod.Status.ContainerStatuses[0].State.Running = nil
|
pod.Status.ContainerStatuses[0].State.Running = nil
|
||||||
pod.Status.ContainerStatuses[0].State.Terminated = &api.ContainerStateTerminated{}
|
pod.Status.ContainerStatuses[0].State.Terminated = &api.ContainerStateTerminated{}
|
||||||
m.SetPodStatus(pod, getRandomPodStatus())
|
m.SetPodStatus(pod, getRandomPodStatus())
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
verifyActions(t, client, []testclient.Action{getAction, updateAction})
|
verifyActions(t, client, []testclient.Action{getAction, updateAction})
|
||||||
client.ClearActions()
|
client.ClearActions()
|
||||||
|
|
||||||
// Error case.
|
// Error case.
|
||||||
err = fmt.Errorf("intentional test error")
|
err = fmt.Errorf("intentional test error")
|
||||||
m.SetPodStatus(pod, getRandomPodStatus())
|
m.SetPodStatus(pod, getRandomPodStatus())
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
verifyActions(t, client, []testclient.Action{getAction})
|
verifyActions(t, client, []testclient.Action{getAction})
|
||||||
client.ClearActions()
|
client.ClearActions()
|
||||||
}
|
}
|
||||||
|
@ -380,7 +398,7 @@ func TestStaleUpdates(t *testing.T) {
|
||||||
verifyUpdates(t, m, 3)
|
verifyUpdates(t, m, 3)
|
||||||
|
|
||||||
t.Logf("First sync pushes latest status.")
|
t.Logf("First sync pushes latest status.")
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
verifyActions(t, m.kubeClient, []testclient.Action{
|
verifyActions(t, m.kubeClient, []testclient.Action{
|
||||||
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
||||||
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
|
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
|
||||||
|
@ -389,7 +407,7 @@ func TestStaleUpdates(t *testing.T) {
|
||||||
|
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < 2; i++ {
|
||||||
t.Logf("Next 2 syncs should be ignored (%d).", i)
|
t.Logf("Next 2 syncs should be ignored (%d).", i)
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
verifyActions(t, m.kubeClient, []testclient.Action{})
|
verifyActions(t, m.kubeClient, []testclient.Action{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -401,7 +419,7 @@ func TestStaleUpdates(t *testing.T) {
|
||||||
m.apiStatusVersions[pod.UID] = m.apiStatusVersions[pod.UID] - 1
|
m.apiStatusVersions[pod.UID] = m.apiStatusVersions[pod.UID] - 1
|
||||||
|
|
||||||
m.SetPodStatus(&pod, status)
|
m.SetPodStatus(&pod, status)
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
verifyActions(t, m.kubeClient, []testclient.Action{
|
verifyActions(t, m.kubeClient, []testclient.Action{
|
||||||
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
||||||
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
|
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
|
||||||
|
@ -471,7 +489,7 @@ func TestStaticPodStatus(t *testing.T) {
|
||||||
retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID)
|
retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID)
|
||||||
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
|
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
|
||||||
// Should translate mirrorPod / staticPod UID.
|
// Should translate mirrorPod / staticPod UID.
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
verifyActions(t, m.kubeClient, []testclient.Action{
|
verifyActions(t, m.kubeClient, []testclient.Action{
|
||||||
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
||||||
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
|
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
|
||||||
|
@ -483,7 +501,7 @@ func TestStaticPodStatus(t *testing.T) {
|
||||||
client.ClearActions()
|
client.ClearActions()
|
||||||
|
|
||||||
// No changes.
|
// No changes.
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
verifyActions(t, m.kubeClient, []testclient.Action{})
|
verifyActions(t, m.kubeClient, []testclient.Action{})
|
||||||
|
|
||||||
// Mirror pod identity changes.
|
// Mirror pod identity changes.
|
||||||
|
@ -492,7 +510,7 @@ func TestStaticPodStatus(t *testing.T) {
|
||||||
mirrorPod.Status = api.PodStatus{}
|
mirrorPod.Status = api.PodStatus{}
|
||||||
m.podManager.AddPod(&mirrorPod)
|
m.podManager.AddPod(&mirrorPod)
|
||||||
// Expect update to new mirrorPod.
|
// Expect update to new mirrorPod.
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
verifyActions(t, m.kubeClient, []testclient.Action{
|
verifyActions(t, m.kubeClient, []testclient.Action{
|
||||||
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
||||||
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
|
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
|
||||||
|
@ -604,7 +622,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
|
||||||
// Orphaned pods should be removed.
|
// Orphaned pods should be removed.
|
||||||
m.apiStatusVersions[testPod.UID] = 100
|
m.apiStatusVersions[testPod.UID] = 100
|
||||||
m.apiStatusVersions[mirrorPod.UID] = 200
|
m.apiStatusVersions[mirrorPod.UID] = 200
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
if _, ok := m.apiStatusVersions[testPod.UID]; ok {
|
if _, ok := m.apiStatusVersions[testPod.UID]; ok {
|
||||||
t.Errorf("Should have cleared status for testPod")
|
t.Errorf("Should have cleared status for testPod")
|
||||||
}
|
}
|
||||||
|
@ -621,7 +639,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
|
||||||
m.podManager.AddPod(&staticPod)
|
m.podManager.AddPod(&staticPod)
|
||||||
m.apiStatusVersions[testPod.UID] = 100
|
m.apiStatusVersions[testPod.UID] = 100
|
||||||
m.apiStatusVersions[mirrorPod.UID] = 200
|
m.apiStatusVersions[mirrorPod.UID] = 200
|
||||||
m.syncBatch()
|
m.testSyncBatch()
|
||||||
if _, ok := m.apiStatusVersions[testPod.UID]; !ok {
|
if _, ok := m.apiStatusVersions[testPod.UID]; !ok {
|
||||||
t.Errorf("Should not have cleared status for testPod")
|
t.Errorf("Should not have cleared status for testPod")
|
||||||
}
|
}
|
||||||
|
@ -630,6 +648,61 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReconcilePodStatus(t *testing.T) {
|
||||||
|
client := testclient.NewSimpleFake(testPod)
|
||||||
|
syncer := newTestManager(client)
|
||||||
|
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
||||||
|
// Call syncBatch directly to test reconcile
|
||||||
|
syncer.syncBatch() // The apiStatusVersions should be set now
|
||||||
|
|
||||||
|
originalStatus := testPod.Status
|
||||||
|
podStatus, ok := syncer.GetPodStatus(testPod.UID)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("Should find pod status for pod: %+v", testPod)
|
||||||
|
}
|
||||||
|
testPod.Status = podStatus
|
||||||
|
|
||||||
|
// If the pod status is the same, a reconciliation is not needed,
|
||||||
|
// syncBatch should do nothing
|
||||||
|
syncer.podManager.UpdatePod(testPod)
|
||||||
|
if syncer.needsReconcile(testPod.UID, podStatus) {
|
||||||
|
t.Errorf("Pod status is the same, a reconciliation is not needed")
|
||||||
|
}
|
||||||
|
client.ClearActions()
|
||||||
|
syncer.syncBatch()
|
||||||
|
verifyActions(t, client, []testclient.Action{})
|
||||||
|
|
||||||
|
// If the pod status is the same, only the timestamp is in Rfc3339 format (lower precision without nanosecond),
|
||||||
|
// a reconciliation is not needed, syncBatch should do nothing.
|
||||||
|
// The StartTime should have been set in SetPodStatus().
|
||||||
|
// TODO(random-liu): Remove this later when api becomes consistent for timestamp.
|
||||||
|
normalizedStartTime := testPod.Status.StartTime.Rfc3339Copy()
|
||||||
|
testPod.Status.StartTime = &normalizedStartTime
|
||||||
|
syncer.podManager.UpdatePod(testPod)
|
||||||
|
if syncer.needsReconcile(testPod.UID, podStatus) {
|
||||||
|
t.Errorf("Pod status only differs for timestamp format, a reconciliation is not needed")
|
||||||
|
}
|
||||||
|
client.ClearActions()
|
||||||
|
syncer.syncBatch()
|
||||||
|
verifyActions(t, client, []testclient.Action{})
|
||||||
|
|
||||||
|
// If the pod status is different, a reconciliation is needed, syncBatch should trigger an update
|
||||||
|
testPod.Status = getRandomPodStatus()
|
||||||
|
syncer.podManager.UpdatePod(testPod)
|
||||||
|
if !syncer.needsReconcile(testPod.UID, podStatus) {
|
||||||
|
t.Errorf("Pod status is different, a reconciliation is needed")
|
||||||
|
}
|
||||||
|
client.ClearActions()
|
||||||
|
syncer.syncBatch()
|
||||||
|
verifyActions(t, client, []testclient.Action{
|
||||||
|
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
|
||||||
|
testclient.UpdateActionImpl{ActionImpl: testclient.ActionImpl{Verb: "update", Resource: "pods", Subresource: "status"}},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Just in case that testPod is shared among different test functions, set it back.
|
||||||
|
testPod.Status = originalStatus
|
||||||
|
}
|
||||||
|
|
||||||
func expectPodStatus(t *testing.T, m *manager, pod *api.Pod) api.PodStatus {
|
func expectPodStatus(t *testing.T, m *manager, pod *api.Pod) api.PodStatus {
|
||||||
status, ok := m.GetPodStatus(pod.UID)
|
status, ok := m.GetPodStatus(pod.UID)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -39,6 +39,9 @@ const (
|
||||||
REMOVE
|
REMOVE
|
||||||
// Pods with the given ids have been updated in this source
|
// Pods with the given ids have been updated in this source
|
||||||
UPDATE
|
UPDATE
|
||||||
|
// Pods with the given ids have unexpected status in this source,
|
||||||
|
// kubelet should reconcile status with this source
|
||||||
|
RECONCILE
|
||||||
|
|
||||||
// These constants identify the sources of pods
|
// These constants identify the sources of pods
|
||||||
// Updates from a file
|
// Updates from a file
|
||||||
|
|
Loading…
Reference in New Issue