Merge pull request #73064 from pontiyaraja/flaky

removed flaky watch from pods test cases
pull/564/head
Kubernetes Prow Robot 2019-02-27 01:03:57 -08:00 committed by GitHub
commit 649f225517
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 44 deletions

View File

@ -29,11 +29,14 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/kubelet" "k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
@ -192,8 +195,30 @@ var _ = framework.KubeDescribe("Pods", func() {
LabelSelector: selector.String(), LabelSelector: selector.String(),
ResourceVersion: pods.ListMeta.ResourceVersion, ResourceVersion: pods.ListMeta.ResourceVersion,
} }
w, err := podClient.Watch(options)
Expect(err).NotTo(HaveOccurred(), "failed to set up watch") listCompleted := make(chan bool, 1)
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = selector.String()
podList, err := podClient.List(options)
if err == nil {
select {
case listCompleted <- true:
framework.Logf("observed the pod list")
return podList, err
default:
framework.Logf("channel blocked")
}
}
return podList, err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = selector.String()
return podClient.Watch(options)
},
}
_, _, w, _ := watchtools.NewIndexerInformerWatcher(lw, &v1.Pod{})
defer w.Stop()
By("submitting the pod to kubernetes") By("submitting the pod to kubernetes")
podClient.Create(pod) podClient.Create(pod)
@ -207,12 +232,17 @@ var _ = framework.KubeDescribe("Pods", func() {
By("verifying pod creation was observed") By("verifying pod creation was observed")
select { select {
case event, _ := <-w.ResultChan(): case <-listCompleted:
if event.Type != watch.Added { select {
framework.Failf("Failed to observe pod creation: %v", event) case event, _ := <-w.ResultChan():
if event.Type != watch.Added {
framework.Failf("Failed to observe pod creation: %v", event)
}
case <-time.After(framework.PodStartTimeout):
framework.Failf("Timeout while waiting for pod creation")
} }
case <-time.After(framework.PodStartTimeout): case <-time.After(10 * time.Second):
framework.Failf("Timeout while waiting for pod creation") framework.Failf("Timeout while waiting to observe pod list")
} }
// We need to wait for the pod to be running, otherwise the deletion // We need to wait for the pod to be running, otherwise the deletion
@ -221,7 +251,6 @@ var _ = framework.KubeDescribe("Pods", func() {
// save the running pod // save the running pod
pod, err = podClient.Get(pod.Name, metav1.GetOptions{}) pod, err = podClient.Get(pod.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred(), "failed to GET scheduled pod") Expect(err).NotTo(HaveOccurred(), "failed to GET scheduled pod")
framework.Logf("running pod: %#v", pod)
By("deleting the pod gracefully") By("deleting the pod gracefully")
err = podClient.Delete(pod.Name, metav1.NewDeleteOptions(30)) err = podClient.Delete(pod.Name, metav1.NewDeleteOptions(30))

View File

@ -37,7 +37,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//test/e2e/common:go_default_library", "//test/e2e/common:go_default_library",
"//test/e2e/framework:go_default_library", "//test/e2e/framework:go_default_library",

View File

@ -18,6 +18,7 @@ package node
import ( import (
"crypto/tls" "crypto/tls"
"encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"regexp" "regexp"
@ -30,7 +31,6 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -74,7 +74,7 @@ var _ = SIGDescribe("Pods Extended", func() {
}, },
} }
By("setting up watch") By("setting up selector")
selector := labels.SelectorFromSet(labels.Set(map[string]string{"time": value})) selector := labels.SelectorFromSet(labels.Set(map[string]string{"time": value}))
options := metav1.ListOptions{LabelSelector: selector.String()} options := metav1.ListOptions{LabelSelector: selector.String()}
pods, err := podClient.List(options) pods, err := podClient.List(options)
@ -84,8 +84,6 @@ var _ = SIGDescribe("Pods Extended", func() {
LabelSelector: selector.String(), LabelSelector: selector.String(),
ResourceVersion: pods.ListMeta.ResourceVersion, ResourceVersion: pods.ListMeta.ResourceVersion,
} }
w, err := podClient.Watch(options)
Expect(err).NotTo(HaveOccurred(), "failed to set up watch")
By("submitting the pod to kubernetes") By("submitting the pod to kubernetes")
podClient.Create(pod) podClient.Create(pod)
@ -97,16 +95,6 @@ var _ = SIGDescribe("Pods Extended", func() {
Expect(err).NotTo(HaveOccurred(), "failed to query for pod") Expect(err).NotTo(HaveOccurred(), "failed to query for pod")
Expect(len(pods.Items)).To(Equal(1)) Expect(len(pods.Items)).To(Equal(1))
By("verifying pod creation was observed")
select {
case event, _ := <-w.ResultChan():
if event.Type != watch.Added {
framework.Failf("Failed to observe pod creation: %v", event)
}
case <-time.After(framework.PodStartTimeout):
framework.Failf("Timeout while waiting for pod creation")
}
// We need to wait for the pod to be running, otherwise the deletion // We need to wait for the pod to be running, otherwise the deletion
// may be carried out immediately rather than gracefully. // may be carried out immediately rather than gracefully.
framework.ExpectNoError(f.WaitForPodRunning(pod.Name)) framework.ExpectNoError(f.WaitForPodRunning(pod.Name))
@ -143,10 +131,15 @@ var _ = SIGDescribe("Pods Extended", func() {
By("deleting the pod gracefully") By("deleting the pod gracefully")
rsp, err := client.Do(req) rsp, err := client.Do(req)
Expect(err).NotTo(HaveOccurred(), "failed to use http client to send delete") Expect(err).NotTo(HaveOccurred(), "failed to use http client to send delete")
Expect(rsp.StatusCode).Should(Equal(http.StatusOK), "failed to delete gracefully by client request")
var lastPod v1.Pod
err = json.NewDecoder(rsp.Body).Decode(&lastPod)
Expect(err).NotTo(HaveOccurred(), "failed to decode graceful termination proxy response")
defer rsp.Body.Close() defer rsp.Body.Close()
By("verifying the kubelet observed the termination notice") By("verifying the kubelet observed the termination notice")
Expect(wait.Poll(time.Second*5, time.Second*30, func() (bool, error) { Expect(wait.Poll(time.Second*5, time.Second*30, func() (bool, error) {
podList, err := framework.GetKubeletPods(f.ClientSet, pod.Spec.NodeName) podList, err := framework.GetKubeletPods(f.ClientSet, pod.Spec.NodeName)
if err != nil { if err != nil {
@ -161,32 +154,12 @@ var _ = SIGDescribe("Pods Extended", func() {
framework.Logf("deletion has not yet been observed") framework.Logf("deletion has not yet been observed")
return false, nil return false, nil
} }
return true, nil return false, nil
} }
framework.Logf("no pod exists with the name we were looking for, assuming the termination request was observed and completed") framework.Logf("no pod exists with the name we were looking for, assuming the termination request was observed and completed")
return true, nil return true, nil
})).NotTo(HaveOccurred(), "kubelet never observed the termination notice") })).NotTo(HaveOccurred(), "kubelet never observed the termination notice")
By("verifying pod deletion was observed")
deleted := false
timeout := false
var lastPod *v1.Pod
timer := time.After(2 * time.Minute)
for !deleted && !timeout {
select {
case event, _ := <-w.ResultChan():
if event.Type == watch.Deleted {
lastPod = event.Object.(*v1.Pod)
deleted = true
}
case <-timer:
timeout = true
}
}
if !deleted {
framework.Failf("Failed to observe pod deletion")
}
Expect(lastPod.DeletionTimestamp).ToNot(BeNil()) Expect(lastPod.DeletionTimestamp).ToNot(BeNil())
Expect(lastPod.Spec.TerminationGracePeriodSeconds).ToNot(BeZero()) Expect(lastPod.Spec.TerminationGracePeriodSeconds).ToNot(BeZero())