Merge pull request #41112 from janetkuo/no-watch-until

Automatic merge from submit-queue (batch tested with PRs 41112, 41201, 41058, 40650, 40926)

e2e test flakes: remove some uses of watch.Until in e2e tests

`watch.Until` is somewhat broken and is causing quite a lot of test flakes. See https://github.com/kubernetes/kubernetes/issues/39879#issuecomment-277966375 and https://github.com/kubernetes/kubernetes/issues/31345 for more context.

@wojtek-t @yujuhong @kargakis
pull/6/head
Kubernetes Submit Queue 2017-02-10 01:40:41 -08:00 committed by GitHub
commit 558c37aee3
10 changed files with 99 additions and 57 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package watch
import (
"errors"
"time"
"k8s.io/apimachinery/pkg/util/wait"
@ -28,6 +29,9 @@ import (
// from false to true).
type ConditionFunc func(event Event) (bool, error)
// errWatchClosed is returned when the watch channel is closed before timeout in Until.
var errWatchClosed = errors.New("watch closed before Until timeout")
// Until reads items from the watch until each provided condition succeeds, and then returns the last watch
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
// If no event has been received, the returned event will be nil.
@ -61,7 +65,7 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
select {
case event, ok := <-ch:
if !ok {
return lastEvent, wait.ErrWaitTimeout
return lastEvent, errWatchClosed
}
lastEvent = &event

View File

@ -361,7 +361,7 @@ func runLivenessTest(f *framework.Framework, pod *v1.Pod, expectNumRestarts int,
// Wait until the pod is not pending. (Here we need to check for something other than
// 'Pending' other than checking for 'Running', since when failures occur, we go to
// 'Terminated' which can cause indefinite blocking.)
framework.ExpectNoError(framework.WaitForPodNotPending(f.ClientSet, ns, pod.Name, pod.ResourceVersion),
framework.ExpectNoError(framework.WaitForPodNotPending(f.ClientSet, ns, pod.Name),
fmt.Sprintf("starting pod %s in namespace %s", pod.Name, ns))
framework.Logf("Started pod %s in namespace %s", pod.Name, ns)

View File

@ -141,7 +141,7 @@ var _ = framework.KubeDescribe("ClusterDns [Feature:Example]", func() {
// wait until the pods have been scheduler, i.e. are not Pending anymore. Remember
// that we cannot wait for the pods to be running because our pods terminate by themselves.
for _, ns := range namespaces {
err := framework.WaitForPodNotPending(c, ns.Name, frontendPodName, "")
err := framework.WaitForPodNotPending(c, ns.Name, frontendPodName)
framework.ExpectNoError(err)
}

View File

@ -460,7 +460,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("creating secret and pod")
framework.RunKubectlOrDie("create", "-f", filepath.Join(framework.TestContext.OutputDir, secretYaml), nsFlag)
framework.RunKubectlOrDie("create", "-f", filepath.Join(framework.TestContext.OutputDir, podYaml), nsFlag)
err := framework.WaitForPodNoLongerRunningInNamespace(c, podName, ns, "")
err := framework.WaitForPodNoLongerRunningInNamespace(c, podName, ns)
Expect(err).NotTo(HaveOccurred())
By("checking if secret was read correctly")
@ -482,7 +482,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("creating the pod")
framework.RunKubectlOrDie("create", "-f", filepath.Join(framework.TestContext.OutputDir, podYaml), nsFlag)
err := framework.WaitForPodNoLongerRunningInNamespace(c, podName, ns, "")
err := framework.WaitForPodNoLongerRunningInNamespace(c, podName, ns)
Expect(err).NotTo(HaveOccurred())
By("checking if name and namespace were passed correctly")

View File

@ -369,19 +369,19 @@ func (f *Framework) WaitForPodRunning(podName string) error {
// WaitForPodReady waits for the pod to flip to ready in the namespace.
func (f *Framework) WaitForPodReady(podName string) error {
return waitTimeoutForPodReadyInNamespace(f.ClientSet, podName, f.Namespace.Name, "", PodStartTimeout)
return waitTimeoutForPodReadyInNamespace(f.ClientSet, podName, f.Namespace.Name, PodStartTimeout)
}
// WaitForPodRunningSlow waits for the pod to run in the namespace.
// It has a longer timeout then WaitForPodRunning (util.slowPodStartTimeout).
func (f *Framework) WaitForPodRunningSlow(podName string) error {
return waitForPodRunningInNamespaceSlow(f.ClientSet, podName, f.Namespace.Name, "")
return waitForPodRunningInNamespaceSlow(f.ClientSet, podName, f.Namespace.Name)
}
// WaitForPodNoLongerRunning waits for the pod to no longer be running in the namespace, for either
// success or failure.
func (f *Framework) WaitForPodNoLongerRunning(podName string) error {
return WaitForPodNoLongerRunningInNamespace(f.ClientSet, podName, f.Namespace.Name, "")
return WaitForPodNoLongerRunningInNamespace(f.ClientSet, podName, f.Namespace.Name)
}
// TestContainerOutput runs the given pod in the given namespace and waits

View File

@ -1208,71 +1208,109 @@ func CheckInvariants(events []watch.Event, fns ...InvariantFunc) error {
// Waits default amount of time (PodStartTimeout) for the specified pod to become running.
// Returns an error if timeout occurs first, or pod goes in to failed state.
func WaitForPodRunningInNamespace(c clientset.Interface, pod *v1.Pod) error {
// this short-cicuit is needed for cases when we pass a list of pods instead
// of newly created pod (e.g. VerifyPods) which means we are getting already
// running pod for which waiting does not make sense and will always fail
if pod.Status.Phase == v1.PodRunning {
return nil
}
return waitTimeoutForPodRunningInNamespace(c, pod.Name, pod.Namespace, pod.ResourceVersion, PodStartTimeout)
return waitTimeoutForPodRunningInNamespace(c, pod.Name, pod.Namespace, PodStartTimeout)
}
// Waits default amount of time (PodStartTimeout) for the specified pod to become running.
// Returns an error if timeout occurs first, or pod goes in to failed state.
func WaitForPodNameRunningInNamespace(c clientset.Interface, podName, namespace string) error {
return waitTimeoutForPodRunningInNamespace(c, podName, namespace, "", PodStartTimeout)
return waitTimeoutForPodRunningInNamespace(c, podName, namespace, PodStartTimeout)
}
// Waits an extended amount of time (slowPodStartTimeout) for the specified pod to become running.
// The resourceVersion is used when Watching object changes, it tells since when we care
// about changes to the pod. Returns an error if timeout occurs first, or pod goes in to failed state.
func waitForPodRunningInNamespaceSlow(c clientset.Interface, podName, namespace, resourceVersion string) error {
return waitTimeoutForPodRunningInNamespace(c, podName, namespace, resourceVersion, slowPodStartTimeout)
func waitForPodRunningInNamespaceSlow(c clientset.Interface, podName, namespace string) error {
return waitTimeoutForPodRunningInNamespace(c, podName, namespace, slowPodStartTimeout)
}
func waitTimeoutForPodRunningInNamespace(c clientset.Interface, podName, namespace, resourceVersion string, timeout time.Duration) error {
w, err := c.Core().Pods(namespace).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: podName, ResourceVersion: resourceVersion}))
if err != nil {
return err
func waitTimeoutForPodRunningInNamespace(c clientset.Interface, podName, namespace string, timeout time.Duration) error {
return wait.PollImmediate(Poll, timeout, podRunning(c, podName, namespace))
}
func podRunning(c clientset.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.Core().Pods(namespace).Get(podName, metav1.GetOptions{})
if err != nil {
return false, err
}
switch pod.Status.Phase {
case v1.PodRunning:
return true, nil
case v1.PodFailed, v1.PodSucceeded:
return false, conditions.ErrPodCompleted
}
return false, nil
}
_, err = watch.Until(timeout, w, conditions.PodRunning)
return err
}
// Waits default amount of time (podNoLongerRunningTimeout) for the specified pod to stop running.
// Returns an error if timeout occurs first.
func WaitForPodNoLongerRunningInNamespace(c clientset.Interface, podName, namespace, resourceVersion string) error {
return WaitTimeoutForPodNoLongerRunningInNamespace(c, podName, namespace, resourceVersion, podNoLongerRunningTimeout)
func WaitForPodNoLongerRunningInNamespace(c clientset.Interface, podName, namespace string) error {
return WaitTimeoutForPodNoLongerRunningInNamespace(c, podName, namespace, podNoLongerRunningTimeout)
}
func WaitTimeoutForPodNoLongerRunningInNamespace(c clientset.Interface, podName, namespace, resourceVersion string, timeout time.Duration) error {
w, err := c.Core().Pods(namespace).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: podName, ResourceVersion: resourceVersion}))
if err != nil {
return err
}
_, err = watch.Until(timeout, w, conditions.PodCompleted)
return err
func WaitTimeoutForPodNoLongerRunningInNamespace(c clientset.Interface, podName, namespace string, timeout time.Duration) error {
return wait.PollImmediate(Poll, timeout, podCompleted(c, podName, namespace))
}
func waitTimeoutForPodReadyInNamespace(c clientset.Interface, podName, namespace, resourceVersion string, timeout time.Duration) error {
w, err := c.Core().Pods(namespace).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: podName, ResourceVersion: resourceVersion}))
if err != nil {
return err
func podCompleted(c clientset.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.Core().Pods(namespace).Get(podName, metav1.GetOptions{})
if err != nil {
return false, err
}
switch pod.Status.Phase {
case v1.PodFailed, v1.PodSucceeded:
return true, nil
}
return false, nil
}
}
func waitTimeoutForPodReadyInNamespace(c clientset.Interface, podName, namespace string, timeout time.Duration) error {
return wait.PollImmediate(Poll, timeout, podRunningAndReady(c, podName, namespace))
}
func podRunningAndReady(c clientset.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.Core().Pods(namespace).Get(podName, metav1.GetOptions{})
if err != nil {
return false, err
}
switch pod.Status.Phase {
case v1.PodFailed, v1.PodSucceeded:
return false, conditions.ErrPodCompleted
case v1.PodRunning:
return v1.IsPodReady(pod), nil
}
return false, nil
}
_, err = watch.Until(timeout, w, conditions.PodRunningAndReady)
return err
}
// WaitForPodNotPending returns an error if it took too long for the pod to go out of pending state.
// The resourceVersion is used when Watching object changes, it tells since when we care
// about changes to the pod.
func WaitForPodNotPending(c clientset.Interface, ns, podName, resourceVersion string) error {
w, err := c.Core().Pods(ns).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: podName, ResourceVersion: resourceVersion}))
if err != nil {
return err
func WaitForPodNotPending(c clientset.Interface, ns, podName string) error {
return wait.PollImmediate(Poll, PodStartTimeout, podNotPending(c, podName, ns))
}
func podNotPending(c clientset.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.Core().Pods(namespace).Get(podName, metav1.GetOptions{})
if err != nil {
return false, err
}
switch pod.Status.Phase {
case v1.PodPending:
return false, nil
default:
return true, nil
}
}
_, err = watch.Until(PodStartTimeout, w, conditions.PodNotPending)
return err
}
// waitForPodTerminatedInNamespace returns an error if it took too long for the pod

View File

@ -420,7 +420,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
// The grace period on the stateful pods is set to a value > 0.
testUnderTemporaryNetworkFailure(c, ns, node, func() {
framework.Logf("Checking that the NodeController does not force delete stateful pods %v", pod.Name)
err := framework.WaitTimeoutForPodNoLongerRunningInNamespace(c, pod.Name, ns, pod.ResourceVersion, 10*time.Minute)
err := framework.WaitTimeoutForPodNoLongerRunningInNamespace(c, pod.Name, ns, 10*time.Minute)
Expect(err).To(Equal(wait.ErrWaitTimeout), "Pod was not deleted during network partition.")
})

View File

@ -272,7 +272,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
By("Trying to relaunch the pod, now with labels.")
labelPodName := "with-labels"
pod := createPausePod(f, pausePodConfig{
_ = createPausePod(f, pausePodConfig{
Name: labelPodName,
NodeSelector: map[string]string{
"kubernetes.io/hostname": nodeName,
@ -285,7 +285,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
// kubelet and the scheduler: the scheduler might have scheduled a pod
// already when the kubelet does not know about its new label yet. The
// kubelet will then refuse to launch the pod.
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName, pod.ResourceVersion))
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName))
labelPod, err := cs.Core().Pods(ns).Get(labelPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(labelPod.Spec.NodeName).To(Equal(nodeName))
@ -346,7 +346,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
By("Trying to relaunch the pod, now with labels.")
labelPodName := "with-labels"
pod := createPausePod(f, pausePodConfig{
_ = createPausePod(f, pausePodConfig{
Name: labelPodName,
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
@ -376,7 +376,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
// kubelet and the scheduler: the scheduler might have scheduled a pod
// already when the kubelet does not know about its new label yet. The
// kubelet will then refuse to launch the pod.
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName, pod.ResourceVersion))
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName))
labelPod, err := cs.Core().Pods(ns).Get(labelPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(labelPod.Spec.NodeName).To(Equal(nodeName))
@ -462,7 +462,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
By("Trying to launch the pod, now with podAffinity.")
labelPodName := "with-podaffinity-" + string(uuid.NewUUID())
pod := createPausePod(f, pausePodConfig{
_ = createPausePod(f, pausePodConfig{
Name: labelPodName,
Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{
@ -490,7 +490,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
// kubelet and the scheduler: the scheduler might have scheduled a pod
// already when the kubelet does not know about its new label yet. The
// kubelet will then refuse to launch the pod.
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName, pod.ResourceVersion))
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName))
labelPod, err := cs.Core().Pods(ns).Get(labelPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(labelPod.Spec.NodeName).To(Equal(nodeName))
@ -573,7 +573,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
By("Trying to launch the pod, now with multiple pod affinities with diff LabelOperators.")
labelPodName := "with-podaffinity-" + string(uuid.NewUUID())
pod := createPausePod(f, pausePodConfig{
_ = createPausePod(f, pausePodConfig{
Name: labelPodName,
Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{
@ -607,7 +607,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
// kubelet and the scheduler: the scheduler might have scheduled a pod
// already when the kubelet does not know about its new label yet. The
// kubelet will then refuse to launch the pod.
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName, pod.ResourceVersion))
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName))
labelPod, err := cs.Core().Pods(ns).Get(labelPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(labelPod.Spec.NodeName).To(Equal(nodeName))
@ -632,7 +632,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
// kubelet and the scheduler: the scheduler might have scheduled a pod
// already when the kubelet does not know about its new label yet. The
// kubelet will then refuse to launch the pod.
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, pod.Name, pod.ResourceVersion))
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, pod.Name))
labelPod, err := cs.Core().Pods(ns).Get(pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(labelPod.Spec.NodeName).To(Equal(nodeName))
@ -656,7 +656,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
// kubelet and the scheduler: the scheduler might have scheduled a pod
// already when the kubelet does not know about its new label yet. The
// kubelet will then refuse to launch the pod.
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, pod.Name, pod.ResourceVersion))
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, pod.Name))
labelPod, err := cs.Core().Pods(ns).Get(pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(labelPod.Spec.NodeName).To(Equal(nodeName))
@ -688,7 +688,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
By("Trying to relaunch the pod, now with tolerations.")
tolerationPodName := "with-tolerations"
pod := createPausePod(f, pausePodConfig{
_ = createPausePod(f, pausePodConfig{
Name: tolerationPodName,
Annotations: map[string]string{
"scheduler.alpha.kubernetes.io/tolerations": `
@ -708,7 +708,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
// kubelet and the scheduler: the scheduler might have scheduled a pod
// already when the kubelet does not know about its new taint yet. The
// kubelet will then refuse to launch the pod.
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, tolerationPodName, pod.ResourceVersion))
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, tolerationPodName))
deployedPod, err := cs.Core().Pods(ns).Get(tolerationPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(deployedPod.Spec.NodeName).To(Equal(nodeName))

View File

@ -144,7 +144,7 @@ func runAppArmorTest(f *framework.Framework, shouldRun bool, profile string) v1.
if shouldRun {
// The pod needs to start before it stops, so wait for the longer start timeout.
framework.ExpectNoError(framework.WaitTimeoutForPodNoLongerRunningInNamespace(
f.ClientSet, pod.Name, f.Namespace.Name, "", framework.PodStartTimeout))
f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout))
} else {
// Pod should remain in the pending state. Wait for the Reason to be set to "AppArmor".
w, err := f.PodClient().Watch(metav1.SingleObject(metav1.ObjectMeta{Name: pod.Name}))

View File

@ -51,7 +51,7 @@ var _ = framework.KubeDescribe("ImageID", func() {
pod := f.PodClient().Create(podDesc)
framework.ExpectNoError(framework.WaitTimeoutForPodNoLongerRunningInNamespace(
f.ClientSet, pod.Name, f.Namespace.Name, "", framework.PodStartTimeout))
f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout))
runningPod, err := f.PodClient().Get(pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err)