mirror of https://github.com/k3s-io/k3s
wait for the Reflector to list before polling the PodStore in e2e tests.
parent
89651077b1
commit
a12dc2e412
|
@ -52,6 +52,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
|
||||
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/kubectl"
|
||||
|
@ -231,7 +232,8 @@ func GetMasterHost() string {
|
|||
// Convenient wrapper around cache.Store that returns list of api.Pod instead of interface{}.
|
||||
type PodStore struct {
|
||||
cache.Store
|
||||
stopCh chan struct{}
|
||||
stopCh chan struct{}
|
||||
reflector *cache.Reflector
|
||||
}
|
||||
|
||||
func NewPodStore(c *client.Client, namespace string, label labels.Selector, field fields.Selector) *PodStore {
|
||||
|
@ -249,8 +251,9 @@ func NewPodStore(c *client.Client, namespace string, label labels.Selector, fiel
|
|||
}
|
||||
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
stopCh := make(chan struct{})
|
||||
cache.NewReflector(lw, &api.Pod{}, store, 0).RunUntil(stopCh)
|
||||
return &PodStore{store, stopCh}
|
||||
reflector := cache.NewReflector(lw, &api.Pod{}, store, 0)
|
||||
reflector.RunUntil(stopCh)
|
||||
return &PodStore{store, stopCh, reflector}
|
||||
}
|
||||
|
||||
func (s *PodStore) List() []*api.Pod {
|
||||
|
@ -3068,6 +3071,10 @@ func DeleteRCAndPods(c *client.Client, ns, name string) error {
|
|||
}
|
||||
return err
|
||||
}
|
||||
ps, err := podStoreForRC(c, rc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
startTime := time.Now()
|
||||
err = reaper.Stop(ns, name, 0, api.NewDeleteOptions(0))
|
||||
if apierrs.IsNotFound(err) {
|
||||
|
@ -3079,12 +3086,18 @@ func DeleteRCAndPods(c *client.Client, ns, name string) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("error while stopping RC: %s: %v", name, err)
|
||||
}
|
||||
err = waitForRCPodsGone(c, rc, nil)
|
||||
err = waitForPodsInactive(ps, 10*time.Millisecond, 10*time.Minute)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while deleting RC %s: %v", name, err)
|
||||
return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err)
|
||||
}
|
||||
terminatePodTime := time.Now().Sub(startTime) - deleteRCTime
|
||||
Logf("Terminating RC %s pods took: %v", name, terminatePodTime)
|
||||
// this is to relieve namespace controller's pressure when deleting the
|
||||
// namespace after a test.
|
||||
err = waitForPodsGone(ps, 10*time.Second, 10*time.Minute)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while waiting for pods gone %s: %v", name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -3099,6 +3112,10 @@ func DeleteRCAndWaitForGC(c *client.Client, ns, name string) error {
|
|||
}
|
||||
return err
|
||||
}
|
||||
ps, err := podStoreForRC(c, rc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
startTime := time.Now()
|
||||
falseVar := false
|
||||
deleteOption := &api.DeleteOptions{OrphanDependents: &falseVar}
|
||||
|
@ -3112,16 +3129,61 @@ func DeleteRCAndWaitForGC(c *client.Client, ns, name string) error {
|
|||
}
|
||||
deleteRCTime := time.Now().Sub(startTime)
|
||||
Logf("Deleting RC %s took: %v", name, deleteRCTime)
|
||||
timeout := 10 * time.Minute
|
||||
err = waitForRCPodsGone(c, rc, &timeout)
|
||||
err = waitForPodsInactive(ps, 10*time.Millisecond, 10*time.Minute)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while deleting RC %s: %v", name, err)
|
||||
return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err)
|
||||
}
|
||||
terminatePodTime := time.Now().Sub(startTime) - deleteRCTime
|
||||
Logf("Terminating RC %s pods took: %v", name, terminatePodTime)
|
||||
err = waitForPodsGone(ps, 10*time.Second, 10*time.Minute)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while waiting for pods gone %s: %v", name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// podStoreForRC creates a PodStore that monitors pods belong to the rc. It
|
||||
// waits until the reflector does a List() before returning.
|
||||
func podStoreForRC(c *client.Client, rc *api.ReplicationController) (*PodStore, error) {
|
||||
labels := labels.SelectorFromSet(rc.Spec.Selector)
|
||||
ps := NewPodStore(c, rc.Namespace, labels, fields.Everything())
|
||||
err := wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) {
|
||||
if len(ps.reflector.LastSyncResourceVersion()) != 0 {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
return ps, err
|
||||
}
|
||||
|
||||
// waitForPodsInactive waits until there are no active pods left in the PodStore.
|
||||
// This is to make a fair comparison of deletion time between DeleteRCAndPods
|
||||
// and DeleteRCAndWaitForGC, because the RC controller decreases status.replicas
|
||||
// when the pod is inactvie.
|
||||
func waitForPodsInactive(ps *PodStore, interval, timeout time.Duration) error {
|
||||
defer ps.Stop()
|
||||
return wait.PollImmediate(interval, timeout, func() (bool, error) {
|
||||
pods := ps.List()
|
||||
for _, pod := range pods {
|
||||
if controller.IsPodActive(*pod) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
// waitForPodsGone waits until there are no pods left in the PodStore.
|
||||
func waitForPodsGone(ps *PodStore, interval, timeout time.Duration) error {
|
||||
defer ps.Stop()
|
||||
return wait.PollImmediate(interval, timeout, func() (bool, error) {
|
||||
if pods := ps.List(); len(pods) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
// waitForRCPodsGone waits until there are no pods reported under an RC's selector (because the pods
|
||||
// have completed termination).
|
||||
func waitForRCPodsGone(c *client.Client, rc *api.ReplicationController, timeout *time.Duration) error {
|
||||
|
|
Loading…
Reference in New Issue