mirror of https://github.com/k3s-io/k3s
improve testing
@ -87,8 +87,10 @@ func getRandomPodStatus() v1.PodStatus {
func verifyActions(t *testing.T, kubeClient clientset.Interface, expectedActions []core.Action) {
actions := kubeClient.(*fake.Clientset).Actions()
func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action) {
actions := manager.kubeClient.(*fake.Clientset).Actions()
defer manager.kubeClient.(*fake.Clientset).ClearActions()
if len(actions) != len(expectedActions) {
t.Fatalf("unexpected actions, got: %+v expected: %+v", actions, expectedActions)
@ -104,26 +106,25 @@ func verifyActions(t *testing.T, kubeClient clientset.Interface, expectedActions
func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) {
// Consume all updates in the channel.
numUpdates := 0
for {
hasUpdate := true
select {
case <-manager.podStatusChannel:
hasUpdate = false
if !hasUpdate {
numUpdates := manager.consumeUpdates()
if numUpdates != expectedUpdates {
t.Errorf("unexpected number of updates %d, expected %d", numUpdates, expectedUpdates)
func (m *manager) consumeUpdates() int {
updates := 0
for {
select {
case syncRequest := <-m.podStatusChannel:
m.syncPod(syncRequest.podUID, syncRequest.status)
return updates
func TestNewStatus(t *testing.T) {
syncer := newTestManager(&fake.Clientset{})
testPod := getTestPod()
@ -284,34 +285,25 @@ func TestUnchangedStatusPreservesLastTransitionTime(t *testing.T) {
func TestSyncBatchIgnoresNotFound(t *testing.T) {
func TestSyncPodIgnoresNotFound(t *testing.T) {
client := fake.Clientset{}
syncer := newTestManager(&client)
client.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewNotFound(api.Resource("pods"), "test-pod")
syncer.SetPodStatus(getTestPod(), getRandomPodStatus())
verifyActions(t, syncer.kubeClient, []core.Action{
core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}},
verifyActions(t, syncer, []core.Action{getAction()})
func TestSyncBatch(t *testing.T) {
func TestSyncPod(t *testing.T) {
syncer := newTestManager(&fake.Clientset{})
testPod := getTestPod()
syncer.kubeClient = fake.NewSimpleClientset(testPod)
syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyActions(t, syncer.kubeClient, []core.Action{
core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}},
core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}},
verifyActions(t, syncer, []core.Action{getAction(), updateAction()})
func TestSyncBatchChecksMismatchedUID(t *testing.T) {
func TestSyncPodChecksMismatchedUID(t *testing.T) {
syncer := newTestManager(&fake.Clientset{})
pod := getTestPod()
pod.UID = "first"
@ -321,13 +313,10 @@ func TestSyncBatchChecksMismatchedUID(t *testing.T) {
syncer.kubeClient = fake.NewSimpleClientset(pod)
syncer.SetPodStatus(differentPod, getRandomPodStatus())
verifyActions(t, syncer.kubeClient, []core.Action{
core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}},
verifyActions(t, syncer, []core.Action{getAction()})
func TestSyncBatchNoDeadlock(t *testing.T) {
func TestSyncPodNoDeadlock(t *testing.T) {
client := &fake.Clientset{}
m := newTestManager(client)
pod := getTestPod()
@ -349,53 +338,38 @@ func TestSyncBatchNoDeadlock(t *testing.T) {
pod.Status.ContainerStatuses = []v1.ContainerStatus{{State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}}}
getAction := core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}}
updateAction := core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}}
// Pod not found.
t.Logf("Pod not found.")
ret = *pod
err = errors.NewNotFound(api.Resource("pods"), pod.Name)
m.SetPodStatus(pod, getRandomPodStatus())
verifyActions(t, client, []core.Action{getAction})
verifyActions(t, m, []core.Action{getAction()})
// Pod was recreated.
t.Logf("Pod was recreated.")
ret.UID = "other_pod"
err = nil
m.SetPodStatus(pod, getRandomPodStatus())
verifyActions(t, client, []core.Action{getAction})
verifyActions(t, m, []core.Action{getAction()})
// Pod not deleted (success case).
t.Logf("Pod not deleted (success case).")
ret = *pod
m.SetPodStatus(pod, getRandomPodStatus())
verifyActions(t, client, []core.Action{getAction, updateAction})
verifyActions(t, m, []core.Action{getAction(), updateAction()})
// Pod is terminated, but still running.
t.Logf("Pod is terminated, but still running.")
pod.DeletionTimestamp = new(metav1.Time)
m.SetPodStatus(pod, getRandomPodStatus())
verifyActions(t, client, []core.Action{getAction, updateAction})
verifyActions(t, m, []core.Action{getAction(), updateAction()})
// Pod is terminated successfully.
t.Logf("Pod is terminated successfully.")
pod.Status.ContainerStatuses[0].State.Running = nil
pod.Status.ContainerStatuses[0].State.Terminated = &v1.ContainerStateTerminated{}
m.SetPodStatus(pod, getRandomPodStatus())
verifyActions(t, client, []core.Action{getAction, updateAction})
verifyActions(t, m, []core.Action{getAction(), updateAction()})
// Error case.
t.Logf("Error case.")
err = fmt.Errorf("intentional test error")
m.SetPodStatus(pod, getRandomPodStatus())
verifyActions(t, client, []core.Action{getAction})
verifyActions(t, m, []core.Action{getAction()})
func TestStaleUpdates(t *testing.T) {
@ -409,21 +383,13 @@ func TestStaleUpdates(t *testing.T) {
m.SetPodStatus(pod, status)
status.Message = "second version bump"
m.SetPodStatus(pod, status)
t.Logf("sync batch before syncPods pushes latest status, so we should see three statuses in the channel, but only one update")
verifyUpdates(t, m, 3)
t.Logf("First sync pushes latest status.")
verifyActions(t, m.kubeClient, []core.Action{
core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}},
core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}},
for i := 0; i < 2; i++ {
t.Logf("Next 2 syncs should be ignored (%d).", i)
verifyActions(t, m.kubeClient, []core.Action{})
verifyActions(t, m, []core.Action{getAction(), updateAction()})
t.Logf("Nothing left in the channel to sync")
verifyActions(t, m, []core.Action{})
t.Log("Unchanged status should not send an update.")
m.SetPodStatus(pod, status)
@ -433,13 +399,10 @@ func TestStaleUpdates(t *testing.T) {
m.apiStatusVersions[pod.UID] = m.apiStatusVersions[pod.UID] - 1
m.SetPodStatus(pod, status)
verifyActions(t, m.kubeClient, []core.Action{
core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}},
core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}},
verifyActions(t, m, []core.Action{getAction(), updateAction()})
// Nothing stuck in the pipe.
t.Logf("Nothing stuck in the pipe.")
verifyUpdates(t, m, 0)
@ -526,7 +489,7 @@ func TestStaticPod(t *testing.T) {
client := fake.NewSimpleClientset(mirrorPod)
m := newTestManager(client)
// Create the static pod
t.Logf("Create the static pod")
assert.True(t, kubepod.IsStaticPod(staticPod), "SetUp error: staticPod")
@ -535,69 +498,57 @@ func TestStaticPod(t *testing.T) {
status.StartTime = &now
m.SetPodStatus(staticPod, status)
// Should be able to get the static pod status from status manager
t.Logf("Should be able to get the static pod status from status manager")
retrievedStatus := expectPodStatus(t, m, staticPod)
normalizeStatus(staticPod, &status)
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
// Should not sync pod because there is no corresponding mirror pod for the static pod.
verifyActions(t, m.kubeClient, []core.Action{})
t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.")
assert.Equal(t, len(m.kubeClient.(*fake.Clientset).Actions()), 0, "Expected no updates after syncBatch, got %+v", m.kubeClient.(*fake.Clientset).Actions())
// Create the mirror pod
t.Logf("Create the mirror pod")
assert.True(t, kubepod.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod")
assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), staticPod.UID)
// Should be able to get the mirror pod status from status manager
t.Logf("Should be able to get the mirror pod status from status manager")
retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID)
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
// Should sync pod because the corresponding mirror pod is created
verifyActions(t, m.kubeClient, []core.Action{
core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}},
core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}},
updateAction := client.Actions()[1].(core.UpdateActionImpl)
updatedPod := updateAction.Object.(*v1.Pod)
assert.Equal(t, mirrorPod.UID, updatedPod.UID, "Expected mirrorPod (%q), but got %q", mirrorPod.UID, updatedPod.UID)
assert.True(t, isStatusEqual(&status, &updatedPod.Status), "Expected: %+v, Got: %+v", status, updatedPod.Status)
t.Logf("Should sync pod because the corresponding mirror pod is created")
verifyActions(t, m, []core.Action{getAction(), updateAction()})
// Should not sync pod because nothing is changed.
t.Logf("syncBatch should not sync any pods because nothing is changed.")
verifyActions(t, m.kubeClient, []core.Action{})
verifyActions(t, m, []core.Action{})
// Change mirror pod identity.
t.Logf("Change mirror pod identity.")
mirrorPod.UID = "new-mirror-pod"
mirrorPod.Status = v1.PodStatus{}
// Should not update to mirror pod, because UID has changed.
verifyActions(t, m.kubeClient, []core.Action{
core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}},
t.Logf("Should not update to mirror pod, because UID has changed.")
verifyActions(t, m, []core.Action{getAction()})
func TestTerminatePod(t *testing.T) {
syncer := newTestManager(&fake.Clientset{})
testPod := getTestPod()
// update the pod's status to Failed. TerminatePod should preserve this status update.
t.Logf("update the pod's status to Failed. TerminatePod should preserve this status update.")
firstStatus := getRandomPodStatus()
firstStatus.Phase = v1.PodFailed
syncer.SetPodStatus(testPod, firstStatus)
// set the testPod to a pod with Phase running, to simulate a stale pod
t.Logf("set the testPod to a pod with Phase running, to simulate a stale pod")
testPod.Status = getRandomPodStatus()
testPod.Status.Phase = v1.PodRunning
// we expect the container statuses to have changed to terminated
t.Logf("we expect the container statuses to have changed to terminated")
newStatus := expectPodStatus(t, syncer, testPod)
for i := range newStatus.ContainerStatuses {
assert.False(t, newStatus.ContainerStatuses[i].State.Terminated == nil, "expected containers to be terminated")
@ -606,7 +557,7 @@ func TestTerminatePod(t *testing.T) {
assert.False(t, newStatus.InitContainerStatuses[i].State.Terminated == nil, "expected init containers to be terminated")
// we expect the previous status update to be preserved.
t.Logf("we expect the previous status update to be preserved.")
assert.Equal(t, newStatus.Phase, firstStatus.Phase)
assert.Equal(t, newStatus.Message, firstStatus.Message)
@ -711,10 +662,10 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
kubetypes.ConfigMirrorAnnotationKey: "mirror",
// Orphaned pods should be removed.
t.Logf("Orphaned pods should be removed.")
m.apiStatusVersions[testPod.UID] = 100
m.apiStatusVersions[mirrorPod.UID] = 200
if _, ok := m.apiStatusVersions[testPod.UID]; ok {
t.Errorf("Should have cleared status for testPod")
@ -722,7 +673,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
t.Errorf("Should have cleared status for mirrorPod")
// Non-orphaned pods should not be removed.
t.Logf("Non-orphaned pods should not be removed.")
m.SetPodStatus(testPod, getRandomPodStatus())
staticPod := mirrorPod
@ -745,8 +696,9 @@ func TestReconcilePodStatus(t *testing.T) {
client := fake.NewSimpleClientset(testPod)
syncer := newTestManager(client)
syncer.SetPodStatus(testPod, getRandomPodStatus())
// Call syncBatch directly to test reconcile
t.Logf("Call syncBatch directly to test reconcile")
syncer.syncBatch() // The apiStatusVersions should be set now
podStatus, ok := syncer.GetPodStatus(testPod.UID)
if !ok {
@ -754,42 +706,36 @@ func TestReconcilePodStatus(t *testing.T) {
testPod.Status = podStatus
// If the pod status is the same, a reconciliation is not needed,
// syncBatch should do nothing
t.Logf("If the pod status is the same, a reconciliation is not needed and syncBatch should do nothing")
if syncer.needsReconcile(testPod.UID, podStatus) {
t.Errorf("Pod status is the same, a reconciliation is not needed")
verifyActions(t, client, []core.Action{})
verifyActions(t, syncer, []core.Action{})
// If the pod status is the same, only the timestamp is in Rfc3339 format (lower precision without nanosecond),
// a reconciliation is not needed, syncBatch should do nothing.
// The StartTime should have been set in SetPodStatus().
// TODO(random-liu): Remove this later when api becomes consistent for timestamp.
t.Logf("Syncbatch should do nothing, as a reconciliation is not required")
normalizedStartTime := testPod.Status.StartTime.Rfc3339Copy()
testPod.Status.StartTime = &normalizedStartTime
if syncer.needsReconcile(testPod.UID, podStatus) {
t.Errorf("Pod status only differs for timestamp format, a reconciliation is not needed")
verifyActions(t, client, []core.Action{})
verifyActions(t, syncer, []core.Action{})
// If the pod status is different, a reconciliation is needed, syncBatch should trigger an update
t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update")
testPod.Status = getRandomPodStatus()
if !syncer.needsReconcile(testPod.UID, podStatus) {
t.Errorf("Pod status is different, a reconciliation is needed")
verifyActions(t, client, []core.Action{
core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}},
core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}},
verifyActions(t, syncer, []core.Action{getAction(), updateAction()})
func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus {
@ -802,7 +748,7 @@ func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus {
func TestDeletePods(t *testing.T) {
pod := getTestPod()
// Set the deletion timestamp.
t.Logf("Set the deletion timestamp.")
pod.DeletionTimestamp = new(metav1.Time)
client := fake.NewSimpleClientset(pod)
m := newTestManager(client)
@ -813,13 +759,8 @@ func TestDeletePods(t *testing.T) {
status.StartTime = &now
m.SetPodStatus(pod, status)
// Expect to see an delete action.
verifyActions(t, m.kubeClient, []core.Action{
core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}},
core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}},
core.DeleteActionImpl{ActionImpl: core.ActionImpl{Verb: "delete", Resource: schema.GroupVersionResource{Resource: "pods"}}},
t.Logf("Expect to see a delete action.")
verifyActions(t, m, []core.Action{getAction(), updateAction(), deleteAction()})
func TestDoNotDeleteMirrorPods(t *testing.T) {
@ -831,13 +772,13 @@ func TestDoNotDeleteMirrorPods(t *testing.T) {
kubetypes.ConfigSourceAnnotationKey: "api",
kubetypes.ConfigMirrorAnnotationKey: "mirror",
// Set the deletion timestamp.
t.Logf("Set the deletion timestamp.")
mirrorPod.DeletionTimestamp = new(metav1.Time)
client := fake.NewSimpleClientset(mirrorPod)
m := newTestManager(client)
// Verify setup.
t.Logf("Verify setup.")
assert.True(t, kubepod.IsStaticPod(staticPod), "SetUp error: staticPod")
assert.True(t, kubepod.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod")
assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), staticPod.UID)
@ -847,10 +788,18 @@ func TestDoNotDeleteMirrorPods(t *testing.T) {
status.StartTime = &now
m.SetPodStatus(staticPod, status)
// Expect not to see an delete action.
verifyActions(t, m.kubeClient, []core.Action{
core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}},
core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}},
t.Logf("Expect not to see a delete action.")
verifyActions(t, m, []core.Action{getAction(), updateAction()})
func getAction() core.GetAction {
return core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}}
func updateAction() core.UpdateAction {
return core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}}
func deleteAction() core.DeleteAction {
return core.DeleteActionImpl{ActionImpl: core.ActionImpl{Verb: "delete", Resource: schema.GroupVersionResource{Resource: "pods"}}}
Reference in New Issue