From 4cb99318d2f6d71e8632a7293553896f947d090b Mon Sep 17 00:00:00 2001 From: wojtekt Date: Thu, 12 Apr 2018 14:24:35 +0200 Subject: [PATCH] Fix PodStore to wait for being initialized --- test/e2e/framework/rc_util.go | 3 ++- test/e2e/framework/util.go | 26 ++++++++------------------ test/e2e/lifecycle/reboot.go | 6 +++++- test/e2e/lifecycle/restart.go | 3 ++- test/e2e/scalability/density.go | 4 +++- test/utils/pod_store.go | 15 +++++++++++++-- test/utils/runners.go | 28 +++++++++++++++++++--------- 7 files changed, 52 insertions(+), 33 deletions(-) diff --git a/test/e2e/framework/rc_util.go b/test/e2e/framework/rc_util.go index ced976c48e..487dc1a030 100644 --- a/test/e2e/framework/rc_util.go +++ b/test/e2e/framework/rc_util.go @@ -25,6 +25,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" @@ -105,7 +106,7 @@ func ScaleRCByLabels(clientset clientset.Interface, scalesGetter scaleclient.Sca return err } if replicas == 0 { - ps, err := podStoreForSelector(clientset, rc.Namespace, labels.SelectorFromSet(rc.Spec.Selector)) + ps, err := testutils.NewPodStore(clientset, rc.Namespace, labels.SelectorFromSet(rc.Spec.Selector), fields.Everything()) if err != nil { return err } diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index a1506ce5df..9c146806b1 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -2857,9 +2857,12 @@ func WaitForControlledPods(c clientset.Interface, ns, name string, kind schema.G // Returns true if all the specified pods are scheduled, else returns false. func podsWithLabelScheduled(c clientset.Interface, ns string, label labels.Selector) (bool, error) { - PodStore := testutils.NewPodStore(c, ns, label, fields.Everything()) - defer PodStore.Stop() - pods := PodStore.List() + ps, err := testutils.NewPodStore(c, ns, label, fields.Everything()) + if err != nil { + return false, err + } + defer ps.Stop() + pods := ps.List() if len(pods) == 0 { return false, nil } @@ -3019,7 +3022,7 @@ func DeleteResourceAndPods(clientset clientset.Interface, internalClientset inte if err != nil { return err } - ps, err := podStoreForSelector(clientset, ns, selector) + ps, err := testutils.NewPodStore(clientset, ns, selector, fields.Everything()) if err != nil { return err } @@ -3068,7 +3071,7 @@ func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns return err } - ps, err := podStoreForSelector(c, ns, selector) + ps, err := testutils.NewPodStore(c, ns, selector, fields.Everything()) if err != nil { return err } @@ -3114,19 +3117,6 @@ func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns return nil } -// podStoreForSelector creates a PodStore that monitors pods from given namespace matching given selector. -// It waits until the reflector does a List() before returning. -func podStoreForSelector(c clientset.Interface, ns string, selector labels.Selector) (*testutils.PodStore, error) { - ps := testutils.NewPodStore(c, ns, selector, fields.Everything()) - err := wait.Poll(100*time.Millisecond, 2*time.Minute, func() (bool, error) { - if len(ps.Reflector.LastSyncResourceVersion()) != 0 { - return true, nil - } - return false, nil - }) - return ps, err -} - // waitForPodsInactive waits until there are no active pods left in the PodStore. // This is to make a fair comparison of deletion time between DeleteRCAndPods // and DeleteRCAndWaitForGC, because the RC controller decreases status.replicas diff --git a/test/e2e/lifecycle/reboot.go b/test/e2e/lifecycle/reboot.go index e3813bde4e..c7cf5cad4e 100644 --- a/test/e2e/lifecycle/reboot.go +++ b/test/e2e/lifecycle/reboot.go @@ -219,7 +219,11 @@ func printStatusAndLogsForNotReadyPods(c clientset.Interface, ns string, podName func rebootNode(c clientset.Interface, provider, name, rebootCmd string) bool { // Setup ns := metav1.NamespaceSystem - ps := testutils.NewPodStore(c, ns, labels.Everything(), fields.OneTermEqualSelector(api.PodHostField, name)) + ps, err := testutils.NewPodStore(c, ns, labels.Everything(), fields.OneTermEqualSelector(api.PodHostField, name)) + if err != nil { + framework.Logf("Couldn't initialize pod store: %v", err) + return false + } defer ps.Stop() // Get the node initially. diff --git a/test/e2e/lifecycle/restart.go b/test/e2e/lifecycle/restart.go index 1f233e9fb0..4508c05c6d 100644 --- a/test/e2e/lifecycle/restart.go +++ b/test/e2e/lifecycle/restart.go @@ -76,8 +76,9 @@ var _ = SIGDescribe("Restart [Disruptive]", func() { // This test requires the ability to restart all nodes, so the provider // check must be identical to that call. framework.SkipUnlessProviderIs("gce", "gke") - ps = testutils.NewPodStore(f.ClientSet, metav1.NamespaceSystem, labels.Everything(), fields.Everything()) var err error + ps, err = testutils.NewPodStore(f.ClientSet, metav1.NamespaceSystem, labels.Everything(), fields.Everything()) + Expect(err).NotTo(HaveOccurred()) numNodes, err = framework.NumberOfRegisteredNodes(f.ClientSet) Expect(err).NotTo(HaveOccurred()) systemNamespace = metav1.NamespaceSystem diff --git a/test/e2e/scalability/density.go b/test/e2e/scalability/density.go index e0cdadafdd..c363da4079 100644 --- a/test/e2e/scalability/density.go +++ b/test/e2e/scalability/density.go @@ -226,8 +226,10 @@ func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceC func logPodStartupStatus(c clientset.Interface, expectedPods int, observedLabels map[string]string, period time.Duration, stopCh chan struct{}) { label := labels.SelectorFromSet(labels.Set(observedLabels)) - podStore := testutils.NewPodStore(c, metav1.NamespaceAll, label, fields.Everything()) + podStore, err := testutils.NewPodStore(c, metav1.NamespaceAll, label, fields.Everything()) + framework.ExpectNoError(err) defer podStore.Stop() + ticker := time.NewTicker(period) defer ticker.Stop() for { diff --git a/test/utils/pod_store.go b/test/utils/pod_store.go index 5005a1f601..8dfdd928dc 100644 --- a/test/utils/pod_store.go +++ b/test/utils/pod_store.go @@ -17,11 +17,14 @@ limitations under the License. package utils import ( + "time" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -34,7 +37,7 @@ type PodStore struct { Reflector *cache.Reflector } -func NewPodStore(c clientset.Interface, namespace string, label labels.Selector, field fields.Selector) *PodStore { +func NewPodStore(c clientset.Interface, namespace string, label labels.Selector, field fields.Selector) (*PodStore, error) { lw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { options.LabelSelector = label.String() @@ -52,7 +55,15 @@ func NewPodStore(c clientset.Interface, namespace string, label labels.Selector, stopCh := make(chan struct{}) reflector := cache.NewReflector(lw, &v1.Pod{}, store, 0) go reflector.Run(stopCh) - return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector} + if err := wait.PollImmediate(50*time.Millisecond, 2*time.Minute, func() (bool, error) { + if len(reflector.LastSyncResourceVersion()) != 0 { + return true, nil + } + return false, nil + }); err != nil { + return nil, err + } + return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector}, nil } func (s *PodStore) List() []*v1.Pod { diff --git a/test/utils/runners.go b/test/utils/runners.go index da5dd49f5f..1be60c20d4 100644 --- a/test/utils/runners.go +++ b/test/utils/runners.go @@ -714,8 +714,11 @@ func (config *RCConfig) start() error { label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name})) - PodStore := NewPodStore(config.Client, config.Namespace, label, fields.Everything()) - defer PodStore.Stop() + ps, err := NewPodStore(config.Client, config.Namespace, label, fields.Everything()) + if err != nil { + return err + } + defer ps.Stop() interval := config.PollInterval if interval <= 0 { @@ -731,7 +734,7 @@ func (config *RCConfig) start() error { for oldRunning != config.Replicas { time.Sleep(interval) - pods := PodStore.List() + pods := ps.List() startupStatus := ComputeRCStartupStatus(pods, config.Replicas) pods = startupStatus.Created @@ -835,10 +838,14 @@ func WaitForPodsWithLabelRunning(c clientset.Interface, ns string, label labels. // one matching pod exists. If 'replicas' is < 0, wait for all matching pods running. func WaitForEnoughPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector, replicas int) error { running := false - PodStore := NewPodStore(c, ns, label, fields.Everything()) - defer PodStore.Stop() + ps, err := NewPodStore(c, ns, label, fields.Everything()) + if err != nil { + return err + } + defer ps.Stop() + for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) { - pods := PodStore.List() + pods := ps.List() if len(pods) == 0 { continue } @@ -1272,11 +1279,14 @@ func (config *DaemonConfig) Run() error { timeout = 5 * time.Minute } - podStore := NewPodStore(config.Client, config.Namespace, labels.SelectorFromSet(nameLabel), fields.Everything()) - defer podStore.Stop() + ps, err := NewPodStore(config.Client, config.Namespace, labels.SelectorFromSet(nameLabel), fields.Everything()) + if err != nil { + return err + } + defer ps.Stop() err = wait.Poll(time.Second, timeout, func() (bool, error) { - pods := podStore.List() + pods := ps.List() nodeHasDaemon := sets.NewString() for _, pod := range pods {