mirror of https://github.com/k3s-io/k3s
Merge pull request #5926 from vmarmol/fast-test
Reduce testing time of status_manager_test.pull/6/head
commit
5d628770c1
|
@ -17,9 +17,9 @@ limitations under the License.
|
|||
package kubelet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
|
@ -52,8 +52,13 @@ func newStatusManager(kubeClient client.Interface) *statusManager {
|
|||
}
|
||||
|
||||
func (s *statusManager) Start() {
|
||||
// We can run SyncBatch() often because it will block until we have some updates to send.
|
||||
go util.Forever(s.SyncBatch, 0)
|
||||
// syncBatch blocks when no updates are available, we can run it in a tight loop.
|
||||
go util.Forever(func() {
|
||||
err := s.syncBatch()
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to updated pod status: %v", err)
|
||||
}
|
||||
}, 0)
|
||||
}
|
||||
|
||||
func (s *statusManager) GetPodStatus(podFullName string) (api.PodStatus, bool) {
|
||||
|
@ -94,28 +99,23 @@ func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// SyncBatch syncs pods statuses with the apiserver. It will loop until channel
|
||||
// s.podStatusChannel is empty for at least 1s.
|
||||
func (s *statusManager) SyncBatch() {
|
||||
for {
|
||||
select {
|
||||
case syncRequest := <-s.podStatusChannel:
|
||||
pod := syncRequest.pod
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
status := syncRequest.status
|
||||
glog.V(3).Infof("Syncing status for %s", podFullName)
|
||||
_, err := s.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status)
|
||||
if err != nil {
|
||||
// We failed to update status. In order to make sure we retry next time
|
||||
// we delete cached value. This may result in an additional update, but
|
||||
// this is ok.
|
||||
s.DeletePodStatus(podFullName)
|
||||
glog.Warningf("Error updating status for pod %q: %v", podFullName, err)
|
||||
} else {
|
||||
glog.V(3).Infof("Status for pod %q updated successfully", podFullName)
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
return
|
||||
}
|
||||
// syncBatch syncs pods statuses with the apiserver.
|
||||
func (s *statusManager) syncBatch() error {
|
||||
syncRequest := <-s.podStatusChannel
|
||||
pod := syncRequest.pod
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
status := syncRequest.status
|
||||
|
||||
_, err := s.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status)
|
||||
if err != nil {
|
||||
// We failed to update status. In order to make sure we retry next time
|
||||
// we delete cached value. This may result in an additional update, but
|
||||
// this is ok.
|
||||
s.DeletePodStatus(podFullName)
|
||||
return fmt.Errorf("error updating status for pod %q: %v", pod.Name, err)
|
||||
} else {
|
||||
glog.V(3).Infof("Status for pod %q updated successfully", pod.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -60,28 +60,55 @@ func verifyActions(t *testing.T, kubeClient client.Interface, expectedActions []
|
|||
}
|
||||
}
|
||||
|
||||
func verifyUpdates(t *testing.T, manager *statusManager, expectedUpdates int) {
|
||||
// Consume all updates in the channel.
|
||||
numUpdates := 0
|
||||
for {
|
||||
hasUpdate := true
|
||||
select {
|
||||
case <-manager.podStatusChannel:
|
||||
numUpdates++
|
||||
default:
|
||||
hasUpdate = false
|
||||
}
|
||||
|
||||
if !hasUpdate {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if numUpdates != expectedUpdates {
|
||||
t.Errorf("unexpected number of updates %d, expected %s", numUpdates, expectedUpdates)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewStatus(t *testing.T) {
|
||||
syncer := newTestStatusManager()
|
||||
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
||||
syncer.SyncBatch()
|
||||
verifyActions(t, syncer.kubeClient, []string{"update-status-pod"})
|
||||
verifyUpdates(t, syncer, 1)
|
||||
}
|
||||
|
||||
func TestChangedStatus(t *testing.T) {
|
||||
syncer := newTestStatusManager()
|
||||
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
||||
syncer.SyncBatch()
|
||||
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
||||
syncer.SyncBatch()
|
||||
verifyActions(t, syncer.kubeClient, []string{"update-status-pod", "update-status-pod"})
|
||||
verifyUpdates(t, syncer, 2)
|
||||
}
|
||||
|
||||
func TestUnchangedStatus(t *testing.T) {
|
||||
syncer := newTestStatusManager()
|
||||
podStatus := getRandomPodStatus()
|
||||
syncer.SetPodStatus(testPod, podStatus)
|
||||
syncer.SyncBatch()
|
||||
syncer.SetPodStatus(testPod, podStatus)
|
||||
syncer.SyncBatch()
|
||||
verifyUpdates(t, syncer, 1)
|
||||
}
|
||||
|
||||
func TestSyncBatch(t *testing.T) {
|
||||
syncer := newTestStatusManager()
|
||||
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
||||
err := syncer.syncBatch()
|
||||
if err != nil {
|
||||
t.Errorf("unexpected syncing error: %v", err)
|
||||
}
|
||||
verifyActions(t, syncer.kubeClient, []string{"update-status-pod"})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue