Merge pull request #62465 from wojtek-t/fix_pod_store

Automatic merge from submit-queue (batch tested with PRs 62455, 62465, 62427, 62416, 62411). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fix PodStore to wait for being initialized

Currently causing some e2e test failure (mostly restart nodes tests).
pull/8/head
Kubernetes Submit Queue 2018-04-12 08:54:12 -07:00 committed by GitHub
commit a6468037b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 52 additions and 33 deletions

View File

@ -25,6 +25,7 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@ -105,7 +106,7 @@ func ScaleRCByLabels(clientset clientset.Interface, scalesGetter scaleclient.Sca
return err return err
} }
if replicas == 0 { if replicas == 0 {
ps, err := podStoreForSelector(clientset, rc.Namespace, labels.SelectorFromSet(rc.Spec.Selector)) ps, err := testutils.NewPodStore(clientset, rc.Namespace, labels.SelectorFromSet(rc.Spec.Selector), fields.Everything())
if err != nil { if err != nil {
return err return err
} }

View File

@ -2857,9 +2857,12 @@ func WaitForControlledPods(c clientset.Interface, ns, name string, kind schema.G
// Returns true if all the specified pods are scheduled, else returns false. // Returns true if all the specified pods are scheduled, else returns false.
func podsWithLabelScheduled(c clientset.Interface, ns string, label labels.Selector) (bool, error) { func podsWithLabelScheduled(c clientset.Interface, ns string, label labels.Selector) (bool, error) {
PodStore := testutils.NewPodStore(c, ns, label, fields.Everything()) ps, err := testutils.NewPodStore(c, ns, label, fields.Everything())
defer PodStore.Stop() if err != nil {
pods := PodStore.List() return false, err
}
defer ps.Stop()
pods := ps.List()
if len(pods) == 0 { if len(pods) == 0 {
return false, nil return false, nil
} }
@ -3019,7 +3022,7 @@ func DeleteResourceAndPods(clientset clientset.Interface, internalClientset inte
if err != nil { if err != nil {
return err return err
} }
ps, err := podStoreForSelector(clientset, ns, selector) ps, err := testutils.NewPodStore(clientset, ns, selector, fields.Everything())
if err != nil { if err != nil {
return err return err
} }
@ -3068,7 +3071,7 @@ func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns
return err return err
} }
ps, err := podStoreForSelector(c, ns, selector) ps, err := testutils.NewPodStore(c, ns, selector, fields.Everything())
if err != nil { if err != nil {
return err return err
} }
@ -3114,19 +3117,6 @@ func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns
return nil return nil
} }
// podStoreForSelector creates a PodStore that monitors pods from given namespace matching given selector.
// It waits until the reflector does a List() before returning.
func podStoreForSelector(c clientset.Interface, ns string, selector labels.Selector) (*testutils.PodStore, error) {
ps := testutils.NewPodStore(c, ns, selector, fields.Everything())
err := wait.Poll(100*time.Millisecond, 2*time.Minute, func() (bool, error) {
if len(ps.Reflector.LastSyncResourceVersion()) != 0 {
return true, nil
}
return false, nil
})
return ps, err
}
// waitForPodsInactive waits until there are no active pods left in the PodStore. // waitForPodsInactive waits until there are no active pods left in the PodStore.
// This is to make a fair comparison of deletion time between DeleteRCAndPods // This is to make a fair comparison of deletion time between DeleteRCAndPods
// and DeleteRCAndWaitForGC, because the RC controller decreases status.replicas // and DeleteRCAndWaitForGC, because the RC controller decreases status.replicas

View File

@ -219,7 +219,11 @@ func printStatusAndLogsForNotReadyPods(c clientset.Interface, ns string, podName
func rebootNode(c clientset.Interface, provider, name, rebootCmd string) bool { func rebootNode(c clientset.Interface, provider, name, rebootCmd string) bool {
// Setup // Setup
ns := metav1.NamespaceSystem ns := metav1.NamespaceSystem
ps := testutils.NewPodStore(c, ns, labels.Everything(), fields.OneTermEqualSelector(api.PodHostField, name)) ps, err := testutils.NewPodStore(c, ns, labels.Everything(), fields.OneTermEqualSelector(api.PodHostField, name))
if err != nil {
framework.Logf("Couldn't initialize pod store: %v", err)
return false
}
defer ps.Stop() defer ps.Stop()
// Get the node initially. // Get the node initially.

View File

@ -76,8 +76,9 @@ var _ = SIGDescribe("Restart [Disruptive]", func() {
// This test requires the ability to restart all nodes, so the provider // This test requires the ability to restart all nodes, so the provider
// check must be identical to that call. // check must be identical to that call.
framework.SkipUnlessProviderIs("gce", "gke") framework.SkipUnlessProviderIs("gce", "gke")
ps = testutils.NewPodStore(f.ClientSet, metav1.NamespaceSystem, labels.Everything(), fields.Everything())
var err error var err error
ps, err = testutils.NewPodStore(f.ClientSet, metav1.NamespaceSystem, labels.Everything(), fields.Everything())
Expect(err).NotTo(HaveOccurred())
numNodes, err = framework.NumberOfRegisteredNodes(f.ClientSet) numNodes, err = framework.NumberOfRegisteredNodes(f.ClientSet)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
systemNamespace = metav1.NamespaceSystem systemNamespace = metav1.NamespaceSystem

View File

@ -226,8 +226,10 @@ func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceC
func logPodStartupStatus(c clientset.Interface, expectedPods int, observedLabels map[string]string, period time.Duration, stopCh chan struct{}) { func logPodStartupStatus(c clientset.Interface, expectedPods int, observedLabels map[string]string, period time.Duration, stopCh chan struct{}) {
label := labels.SelectorFromSet(labels.Set(observedLabels)) label := labels.SelectorFromSet(labels.Set(observedLabels))
podStore := testutils.NewPodStore(c, metav1.NamespaceAll, label, fields.Everything()) podStore, err := testutils.NewPodStore(c, metav1.NamespaceAll, label, fields.Everything())
framework.ExpectNoError(err)
defer podStore.Stop() defer podStore.Stop()
ticker := time.NewTicker(period) ticker := time.NewTicker(period)
defer ticker.Stop() defer ticker.Stop()
for { for {

View File

@ -17,11 +17,14 @@ limitations under the License.
package utils package utils
import ( import (
"time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -34,7 +37,7 @@ type PodStore struct {
Reflector *cache.Reflector Reflector *cache.Reflector
} }
func NewPodStore(c clientset.Interface, namespace string, label labels.Selector, field fields.Selector) *PodStore { func NewPodStore(c clientset.Interface, namespace string, label labels.Selector, field fields.Selector) (*PodStore, error) {
lw := &cache.ListWatch{ lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = label.String() options.LabelSelector = label.String()
@ -52,7 +55,15 @@ func NewPodStore(c clientset.Interface, namespace string, label labels.Selector,
stopCh := make(chan struct{}) stopCh := make(chan struct{})
reflector := cache.NewReflector(lw, &v1.Pod{}, store, 0) reflector := cache.NewReflector(lw, &v1.Pod{}, store, 0)
go reflector.Run(stopCh) go reflector.Run(stopCh)
return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector} if err := wait.PollImmediate(50*time.Millisecond, 2*time.Minute, func() (bool, error) {
if len(reflector.LastSyncResourceVersion()) != 0 {
return true, nil
}
return false, nil
}); err != nil {
return nil, err
}
return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector}, nil
} }
func (s *PodStore) List() []*v1.Pod { func (s *PodStore) List() []*v1.Pod {

View File

@ -714,8 +714,11 @@ func (config *RCConfig) start() error {
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name})) label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
PodStore := NewPodStore(config.Client, config.Namespace, label, fields.Everything()) ps, err := NewPodStore(config.Client, config.Namespace, label, fields.Everything())
defer PodStore.Stop() if err != nil {
return err
}
defer ps.Stop()
interval := config.PollInterval interval := config.PollInterval
if interval <= 0 { if interval <= 0 {
@ -731,7 +734,7 @@ func (config *RCConfig) start() error {
for oldRunning != config.Replicas { for oldRunning != config.Replicas {
time.Sleep(interval) time.Sleep(interval)
pods := PodStore.List() pods := ps.List()
startupStatus := ComputeRCStartupStatus(pods, config.Replicas) startupStatus := ComputeRCStartupStatus(pods, config.Replicas)
pods = startupStatus.Created pods = startupStatus.Created
@ -835,10 +838,14 @@ func WaitForPodsWithLabelRunning(c clientset.Interface, ns string, label labels.
// one matching pod exists. If 'replicas' is < 0, wait for all matching pods running. // one matching pod exists. If 'replicas' is < 0, wait for all matching pods running.
func WaitForEnoughPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector, replicas int) error { func WaitForEnoughPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector, replicas int) error {
running := false running := false
PodStore := NewPodStore(c, ns, label, fields.Everything()) ps, err := NewPodStore(c, ns, label, fields.Everything())
defer PodStore.Stop() if err != nil {
return err
}
defer ps.Stop()
for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) { for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) {
pods := PodStore.List() pods := ps.List()
if len(pods) == 0 { if len(pods) == 0 {
continue continue
} }
@ -1272,11 +1279,14 @@ func (config *DaemonConfig) Run() error {
timeout = 5 * time.Minute timeout = 5 * time.Minute
} }
podStore := NewPodStore(config.Client, config.Namespace, labels.SelectorFromSet(nameLabel), fields.Everything()) ps, err := NewPodStore(config.Client, config.Namespace, labels.SelectorFromSet(nameLabel), fields.Everything())
defer podStore.Stop() if err != nil {
return err
}
defer ps.Stop()
err = wait.Poll(time.Second, timeout, func() (bool, error) { err = wait.Poll(time.Second, timeout, func() (bool, error) {
pods := podStore.List() pods := ps.List()
nodeHasDaemon := sets.NewString() nodeHasDaemon := sets.NewString()
for _, pod := range pods { for _, pod := range pods {