mirror of https://github.com/k3s-io/k3s
removed flaky watch from pods test cases
parent
38a325250f
commit
12943ce1ac
|
@ -29,11 +29,14 @@ import (
|
|||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/kubelet"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
@ -192,8 +195,30 @@ var _ = framework.KubeDescribe("Pods", func() {
|
|||
LabelSelector: selector.String(),
|
||||
ResourceVersion: pods.ListMeta.ResourceVersion,
|
||||
}
|
||||
w, err := podClient.Watch(options)
|
||||
Expect(err).NotTo(HaveOccurred(), "failed to set up watch")
|
||||
|
||||
listCompleted := make(chan bool, 1)
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
options.LabelSelector = selector.String()
|
||||
podList, err := podClient.List(options)
|
||||
if err == nil {
|
||||
select {
|
||||
case listCompleted <- true:
|
||||
framework.Logf("observed the pod list")
|
||||
return podList, err
|
||||
default:
|
||||
framework.Logf("channel blocked")
|
||||
}
|
||||
}
|
||||
return podList, err
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.LabelSelector = selector.String()
|
||||
return podClient.Watch(options)
|
||||
},
|
||||
}
|
||||
_, _, w, _ := watchtools.NewIndexerInformerWatcher(lw, &v1.Pod{})
|
||||
defer w.Stop()
|
||||
|
||||
By("submitting the pod to kubernetes")
|
||||
podClient.Create(pod)
|
||||
|
@ -207,12 +232,17 @@ var _ = framework.KubeDescribe("Pods", func() {
|
|||
|
||||
By("verifying pod creation was observed")
|
||||
select {
|
||||
case event, _ := <-w.ResultChan():
|
||||
if event.Type != watch.Added {
|
||||
framework.Failf("Failed to observe pod creation: %v", event)
|
||||
case <-listCompleted:
|
||||
select {
|
||||
case event, _ := <-w.ResultChan():
|
||||
if event.Type != watch.Added {
|
||||
framework.Failf("Failed to observe pod creation: %v", event)
|
||||
}
|
||||
case <-time.After(framework.PodStartTimeout):
|
||||
framework.Failf("Timeout while waiting for pod creation")
|
||||
}
|
||||
case <-time.After(framework.PodStartTimeout):
|
||||
framework.Failf("Timeout while waiting for pod creation")
|
||||
case <-time.After(10 * time.Second):
|
||||
framework.Failf("Timeout while waiting to observe pod list")
|
||||
}
|
||||
|
||||
// We need to wait for the pod to be running, otherwise the deletion
|
||||
|
@ -221,7 +251,6 @@ var _ = framework.KubeDescribe("Pods", func() {
|
|||
// save the running pod
|
||||
pod, err = podClient.Get(pod.Name, metav1.GetOptions{})
|
||||
Expect(err).NotTo(HaveOccurred(), "failed to GET scheduled pod")
|
||||
framework.Logf("running pod: %#v", pod)
|
||||
|
||||
By("deleting the pod gracefully")
|
||||
err = podClient.Delete(pod.Name, metav1.NewDeleteOptions(30))
|
||||
|
|
|
@ -37,7 +37,6 @@ go_library(
|
|||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//test/e2e/common:go_default_library",
|
||||
"//test/e2e/framework:go_default_library",
|
||||
|
|
|
@ -18,6 +18,7 @@ package node
|
|||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"regexp"
|
||||
|
@ -30,7 +31,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
|
@ -74,7 +74,7 @@ var _ = SIGDescribe("Pods Extended", func() {
|
|||
},
|
||||
}
|
||||
|
||||
By("setting up watch")
|
||||
By("setting up selector")
|
||||
selector := labels.SelectorFromSet(labels.Set(map[string]string{"time": value}))
|
||||
options := metav1.ListOptions{LabelSelector: selector.String()}
|
||||
pods, err := podClient.List(options)
|
||||
|
@ -84,8 +84,6 @@ var _ = SIGDescribe("Pods Extended", func() {
|
|||
LabelSelector: selector.String(),
|
||||
ResourceVersion: pods.ListMeta.ResourceVersion,
|
||||
}
|
||||
w, err := podClient.Watch(options)
|
||||
Expect(err).NotTo(HaveOccurred(), "failed to set up watch")
|
||||
|
||||
By("submitting the pod to kubernetes")
|
||||
podClient.Create(pod)
|
||||
|
@ -97,16 +95,6 @@ var _ = SIGDescribe("Pods Extended", func() {
|
|||
Expect(err).NotTo(HaveOccurred(), "failed to query for pod")
|
||||
Expect(len(pods.Items)).To(Equal(1))
|
||||
|
||||
By("verifying pod creation was observed")
|
||||
select {
|
||||
case event, _ := <-w.ResultChan():
|
||||
if event.Type != watch.Added {
|
||||
framework.Failf("Failed to observe pod creation: %v", event)
|
||||
}
|
||||
case <-time.After(framework.PodStartTimeout):
|
||||
framework.Failf("Timeout while waiting for pod creation")
|
||||
}
|
||||
|
||||
// We need to wait for the pod to be running, otherwise the deletion
|
||||
// may be carried out immediately rather than gracefully.
|
||||
framework.ExpectNoError(f.WaitForPodRunning(pod.Name))
|
||||
|
@ -143,10 +131,15 @@ var _ = SIGDescribe("Pods Extended", func() {
|
|||
By("deleting the pod gracefully")
|
||||
rsp, err := client.Do(req)
|
||||
Expect(err).NotTo(HaveOccurred(), "failed to use http client to send delete")
|
||||
Expect(rsp.StatusCode).Should(Equal(http.StatusOK), "failed to delete gracefully by client request")
|
||||
var lastPod v1.Pod
|
||||
err = json.NewDecoder(rsp.Body).Decode(&lastPod)
|
||||
Expect(err).NotTo(HaveOccurred(), "failed to decode graceful termination proxy response")
|
||||
|
||||
defer rsp.Body.Close()
|
||||
|
||||
By("verifying the kubelet observed the termination notice")
|
||||
|
||||
Expect(wait.Poll(time.Second*5, time.Second*30, func() (bool, error) {
|
||||
podList, err := framework.GetKubeletPods(f.ClientSet, pod.Spec.NodeName)
|
||||
if err != nil {
|
||||
|
@ -161,32 +154,12 @@ var _ = SIGDescribe("Pods Extended", func() {
|
|||
framework.Logf("deletion has not yet been observed")
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
return false, nil
|
||||
}
|
||||
framework.Logf("no pod exists with the name we were looking for, assuming the termination request was observed and completed")
|
||||
return true, nil
|
||||
})).NotTo(HaveOccurred(), "kubelet never observed the termination notice")
|
||||
|
||||
By("verifying pod deletion was observed")
|
||||
deleted := false
|
||||
timeout := false
|
||||
var lastPod *v1.Pod
|
||||
timer := time.After(2 * time.Minute)
|
||||
for !deleted && !timeout {
|
||||
select {
|
||||
case event, _ := <-w.ResultChan():
|
||||
if event.Type == watch.Deleted {
|
||||
lastPod = event.Object.(*v1.Pod)
|
||||
deleted = true
|
||||
}
|
||||
case <-timer:
|
||||
timeout = true
|
||||
}
|
||||
}
|
||||
if !deleted {
|
||||
framework.Failf("Failed to observe pod deletion")
|
||||
}
|
||||
|
||||
Expect(lastPod.DeletionTimestamp).ToNot(BeNil())
|
||||
Expect(lastPod.Spec.TerminationGracePeriodSeconds).ToNot(BeZero())
|
||||
|
||||
|
|
Loading…
Reference in New Issue