Revert "improve e2e retry logic with standard wait.Poll()"

pull/6/head
Quinton Hoole 2015-05-19 11:17:32 -07:00
parent b963307496
commit 14665119b4
10 changed files with 119 additions and 123 deletions

View File

@ -27,7 +27,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -73,14 +72,12 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
// being run as the first e2e test just after the e2e cluster has been created.
var err error
const graceTime = 10 * time.Minute
start := time.Now()
expectNoError(wait.Poll(5*time.Second, graceTime, func() (bool, error) {
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
if _, err = s.Get("elasticsearch-logging"); err == nil {
return true, nil
break
}
Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
return false, nil
}))
}
Expect(err).NotTo(HaveOccurred())
// Wait for the Elasticsearch pods to enter the running state.
@ -98,8 +95,7 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
var statusCode float64
var esResponse map[string]interface{}
err = nil
start = time.Now()
expectNoError(wait.Poll(5*time.Second, graceTime, func() (bool, error) {
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
// Query against the root URL for Elasticsearch.
body, err := c.Get().
Namespace(api.NamespaceDefault).
@ -109,26 +105,26 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
DoRaw()
if err != nil {
Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
return false, nil
continue
}
esResponse, err = bodyToJSON(body)
if err != nil {
Logf("After %v failed to convert Elasticsearch JSON response %v to map[string]interface{}: %v", time.Since(start), string(body), err)
return false, nil
continue
}
statusIntf, ok := esResponse["status"]
if !ok {
Logf("After %v Elasticsearch response has no status field: %v", time.Since(start), esResponse)
return false, nil
continue
}
statusCode, ok = statusIntf.(float64)
if !ok {
// Assume this is a string returning Failure. Retry.
Logf("After %v expected status to be a float64 but got %v of type %T", time.Since(start), statusIntf, statusIntf)
return false, nil
continue
}
return true, nil
}))
break
}
Expect(err).NotTo(HaveOccurred())
if int(statusCode) != 200 {
Failf("Elasticsearch cluster has a bad status: %v", statusCode)
@ -237,8 +233,7 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
By("Checking all the log lines were ingested into Elasticsearch")
missing := 0
expected := nodeCount * countTo
start = time.Now()
expectNoError(wait.Poll(10*time.Second, graceTime, func() (bool, error) {
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(10 * time.Second) {
// Ask Elasticsearch to return all the log lines that were tagged with the underscore
// verison of the name. Ask for twice as many log lines as we expect to check for
// duplication bugs.
@ -253,13 +248,13 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
DoRaw()
if err != nil {
Logf("After %v failed to make proxy call to elasticsearch-logging: %v", time.Since(start), err)
return false, nil
continue
}
response, err := bodyToJSON(body)
if err != nil {
Logf("After %v failed to unmarshal response: %v", time.Since(start), err)
return false, nil
continue
}
hits, ok := response["hits"].(map[string]interface{})
if !ok {
@ -268,17 +263,17 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
totalF, ok := hits["total"].(float64)
if !ok {
Logf("After %v hits[total] not of the expected type: %T", time.Since(start), hits["total"])
return false, nil
continue
}
total := int(totalF)
if total < expected {
Logf("After %v expecting to find %d log lines but saw only %d", time.Since(start), expected, total)
return false, nil
continue
}
h, ok := hits["hits"].([]interface{})
if !ok {
Logf("After %v hits not of the expected type: %T", time.Since(start), hits["hits"])
return false, nil
continue
}
// Initialize data-structure for observing counts.
observed := make([][]int, nodeCount)
@ -334,10 +329,10 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
}
if missing != 0 {
Logf("After %v still missing %d log lines", time.Since(start), missing)
return false, nil
continue
}
Logf("After %s found all %d log lines", time.Since(start), expected)
return true, nil
}))
return
}
Failf("Failed to find all %d log lines", expected)
}

View File

@ -26,7 +26,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -150,14 +149,13 @@ func validateGuestbookApp(c *client.Client, ns string) {
// Returns whether received expected response from guestbook on time.
func waitForGuestbookResponse(c *client.Client, cmd, arg, expectedResponse string, timeout time.Duration, ns string) bool {
expectNoError(wait.Poll(5*time.Second, timeout, func() (bool, error) {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) {
res, err := makeRequestToGuestbook(c, cmd, arg, ns)
if err == nil && res == expectedResponse {
return true, nil
return true
}
return false, nil
}))
return true
}
return false
}
func makeRequestToGuestbook(c *client.Client, cmd, value string, ns string) (string, error) {

View File

@ -26,7 +26,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -119,7 +118,7 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int)
defer wg.Done()
rcExist := false
// Once every 1-2 minutes perform resize of RC.
expectNoError(wait.Poll(time.Duration(60+rand.Intn(60))*time.Second, simulationTime, func() (bool, error) {
for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) {
if !rcExist {
expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s", name, ns))
rcExist = true
@ -132,8 +131,7 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int)
expectNoError(DeleteRC(c, ns, name), fmt.Sprintf("deleting rc %s in namespace %s", name, ns))
rcExist = false
}
return false, nil
}))
}
if rcExist {
expectNoError(DeleteRC(c, ns, name), fmt.Sprintf("deleting rc %s in namespace %s after test completion", name, ns))
}

View File

@ -26,7 +26,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
influxdb "github.com/influxdb/influxdb/client"
. "github.com/onsi/ginkgo"
@ -224,12 +223,15 @@ func testMonitoringUsingHeapsterInfluxdb(c *client.Client) {
expectedNodes, err := getAllNodesInCluster(c)
expectNoError(err)
expectNoError(wait.Poll(sleepBetweenAttempts, testTimeout, func() (bool, error) {
startTime := time.Now()
for {
if validatePodsAndNodes(influxdbClient, expectedPods, expectedNodes) {
return true, nil
return
}
return false, nil
}))
if time.Since(startTime) >= testTimeout {
break
}
time.Sleep(sleepBetweenAttempts)
}
Failf("monitoring using heapster and influxdb test failed")
}

View File

@ -29,8 +29,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
@ -110,14 +108,14 @@ var _ = Describe("PD", func() {
expectNoError(podClient.Delete(host1Pod.Name, nil), "Failed to delete host1Pod")
By(fmt.Sprintf("deleting PD %q", diskName))
expectNoError(wait.Poll(5*time.Second, 180*time.Second, func() (bool, error) {
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
if err = deletePD(diskName); err != nil {
Logf("Couldn't delete PD. Sleeping 5 seconds (%v)", err)
return false, nil
continue
}
Logf("Deleted PD %v", diskName)
return true, nil
}))
break
}
expectNoError(err, "Error deleting PD")
return
@ -178,14 +176,13 @@ var _ = Describe("PD", func() {
expectNoError(podClient.Delete(host1ROPod.Name, nil), "Failed to delete host1ROPod")
By(fmt.Sprintf("deleting PD %q", diskName))
expectNoError(wait.Poll(5*time.Second, 180*time.Second, func() (bool, error) {
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
if err = deletePD(diskName); err != nil {
Logf("Couldn't delete PD. Sleeping 5 seconds")
return false, nil
continue
}
return true, nil
}))
break
}
expectNoError(err, "Error deleting PD")
})
})

View File

@ -98,21 +98,22 @@ func testHostIP(c *client.Client, pod *api.Pod) {
err = waitForPodRunningInNamespace(c, pod.Name, ns)
Expect(err).NotTo(HaveOccurred())
// Try to make sure we get a hostIP for each pod.
var (
hostIPTimeout = 2 * time.Minute
pods *api.Pod
)
expectNoError(wait.Poll(5*time.Second, hostIPTimeout, func() (bool, error) {
pods, err = podClient.Get(pod.Name)
hostIPTimeout := 2 * time.Minute
t := time.Now()
for {
p, err := podClient.Get(pod.Name)
Expect(err).NotTo(HaveOccurred())
if pods.Status.HostIP != "" {
Logf("Pod %s has hostIP: %s", pods.Name, pods.Status.HostIP)
return true, nil
if p.Status.HostIP != "" {
Logf("Pod %s has hostIP: %s", p.Name, p.Status.HostIP)
break
}
Logf("Retrying to get the hostIP of pod %s", pods.Name)
return false, nil
}))
if time.Since(t) >= hostIPTimeout {
Failf("Gave up waiting for hostIP of pod %s after %v seconds",
p.Name, time.Since(t).Seconds())
}
Logf("Retrying to get the hostIP of pod %s", p.Name)
time.Sleep(5 * time.Second)
}
}
var _ = Describe("Pods", func() {

View File

@ -111,16 +111,22 @@ func ServeImageOrFail(c *client.Client, test string, image string) {
// List the pods, making sure we observe all the replicas.
listTimeout := time.Minute
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
var pods *api.PodList
expectNoError(wait.Poll(5*time.Second, listTimeout, func() (bool, error) {
pods, err = c.Pods(ns).List(label, fields.Everything())
pods, err := c.Pods(ns).List(label, fields.Everything())
Expect(err).NotTo(HaveOccurred())
t := time.Now()
for {
Logf("Controller %s: Found %d pods out of %d", name, len(pods.Items), replicas)
if len(pods.Items) == replicas {
return true, nil
break
}
return false, nil
}))
if time.Since(t) > listTimeout {
Failf("Controller %s: Gave up waiting for %d pods to come up after seeing only %d pods after %v seconds",
name, replicas, len(pods.Items), time.Since(t).Seconds())
}
time.Sleep(5 * time.Second)
pods, err = c.Pods(ns).List(label, fields.Everything())
Expect(err).NotTo(HaveOccurred())
}
By("Ensuring each pod is running")

View File

@ -24,7 +24,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -256,11 +255,11 @@ func waitForNodeToBeNotReady(c *client.Client, name string, timeout time.Duratio
// ready or unknown).
func waitForNodeToBe(c *client.Client, name string, wantReady bool, timeout time.Duration) bool {
Logf("Waiting up to %v for node %s readiness to be %t", timeout, name, wantReady)
expectNoError(wait.Poll(poll, timeout, func() (bool, error) {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
node, err := c.Nodes().Get(name)
if err != nil {
Logf("Couldn't get node %s", name)
return false, nil
continue
}
// Check the node readiness condition (logging all).
@ -271,11 +270,10 @@ func waitForNodeToBe(c *client.Client, name string, wantReady bool, timeout time
// matches as desired.
if cond.Type == api.NodeReady && (cond.Status == api.ConditionTrue) == wantReady {
Logf("Successfully found node %s readiness to be %t", name, wantReady)
return true, nil
return true
}
}
return false, nil
}))
return true
}
Logf("Node %s didn't reach desired readiness (%t) within %v", name, wantReady, timeout)
return false
}

View File

@ -28,7 +28,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -83,7 +82,7 @@ var _ = Describe("Services", func() {
Expect(foundRO).To(Equal(true))
})
It("should serve a basic endpoint from pods", func() {
It("should serve a basic endpoint from pods", func(done Done) {
serviceName := "endpoint-test2"
ns := namespaces[0]
labels := map[string]string{
@ -145,9 +144,13 @@ var _ = Describe("Services", func() {
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{})
})
// We deferred Gingko pieces that may Fail, we aren't done.
defer func() {
close(done)
}()
}, 240.0)
It("should serve multiport endpoints from pods", func() {
It("should serve multiport endpoints from pods", func(done Done) {
// repacking functionality is intentionally not tested here - it's better to test it in an integration test.
serviceName := "multi-endpoint-test"
ns := namespaces[0]
@ -242,7 +245,11 @@ var _ = Describe("Services", func() {
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{})
})
// We deferred Gingko pieces that may Fail, we aren't done.
defer func() {
close(done)
}()
}, 240.0)
It("should be able to create a functioning external load balancer", func() {
if !providerIs("gce", "gke") {
@ -321,13 +328,13 @@ var _ = Describe("Services", func() {
By("hitting the pod through the service's external load balancer")
var resp *http.Response
expectNoError(wait.Poll(5*time.Second, podStartTimeout, func() (bool, error) {
for t := time.Now(); time.Since(t) < podStartTimeout; time.Sleep(5 * time.Second) {
resp, err = http.Get(fmt.Sprintf("http://%s:%d", ip, port))
if err == nil {
return true, nil
break
}
return false, nil
}))
}
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
@ -393,21 +400,16 @@ func waitForPublicIPs(c *client.Client, serviceName, namespace string) (*api.Ser
const timeout = 4 * time.Minute
var service *api.Service
By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to have a public IP", timeout, serviceName, namespace))
start := time.Now()
expectNoError(wait.Poll(5*time.Second, timeout, func() (bool, error) {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) {
service, err := c.Services(namespace).Get(serviceName)
if err != nil {
Logf("Get service failed, ignoring for 5s: %v", err)
return false, nil
continue
}
if len(service.Spec.PublicIPs) > 0 {
return true, nil
return service, nil
}
Logf("Waiting for service %s in namespace %s to have a public IP (%v)", serviceName, namespace, time.Since(start))
return false, nil
}))
if len(service.Spec.PublicIPs) > 0 {
return service, nil
}
return service, fmt.Errorf("service %s in namespace %s doesn't have a public IP after %.2f seconds", serviceName, namespace, timeout.Seconds())
}
@ -479,28 +481,27 @@ func validatePortsOrFail(endpoints map[string][]int, expectedEndpoints map[strin
func validateEndpointsOrFail(c *client.Client, ns, serviceName string, expectedEndpoints map[string][]int) {
By(fmt.Sprintf("Validating endpoints %v with on service %s/%s", expectedEndpoints, ns, serviceName))
expectNoError(wait.Poll(time.Second, 120*time.Second, func() (bool, error) {
for {
endpoints, err := c.Endpoints(ns).Get(serviceName)
if err == nil {
Logf("Found endpoints %v", endpoints)
By(fmt.Sprintf("Found endpoints %v", endpoints))
portsByIp := getPortsByIp(endpoints.Subsets)
Logf("Found ports by ip %v", portsByIp)
By(fmt.Sprintf("Found ports by ip %v", portsByIp))
if len(portsByIp) == len(expectedEndpoints) {
expectedPortsByIp := translatePodNameToIpOrFail(c, ns, expectedEndpoints)
validatePortsOrFail(portsByIp, expectedPortsByIp)
return true, nil
break
} else {
Logf("Unexpected number of endpoints: found %v, expected %v (ignoring for 1 second)", portsByIp, expectedEndpoints)
By(fmt.Sprintf("Unexpected number of endpoints: found %v, expected %v (ignoring for 1 second)", portsByIp, expectedEndpoints))
}
} else {
Logf("Failed to get endpoints: %v (ignoring for 1 second)", err)
By(fmt.Sprintf("Failed to get endpoints: %v (ignoring for 1 second)", err))
}
return false, nil
}))
Logf("successfully validated endpoints %v with on service %s/%s", expectedEndpoints, ns, serviceName)
time.Sleep(time.Second)
}
By(fmt.Sprintf("successfully validated endpoints %v with on service %s/%s", expectedEndpoints, ns, serviceName))
}
func addEndpointPodOrFail(c *client.Client, ns, name string, labels map[string]string, containerPorts []api.ContainerPort) {

View File

@ -39,7 +39,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"golang.org/x/crypto/ssh"
@ -272,33 +271,32 @@ func validateController(c *client.Client, containerImage string, replicas int, c
getImageTemplate := fmt.Sprintf(`--template={{if (exists . "status" "containerStatuses")}}{{range .status.containerStatuses}}{{if eq .name "%s"}}{{.image}}{{end}}{{end}}{{end}}`, containername)
By(fmt.Sprintf("waiting for all containers in %s pods to come up.", testname)) //testname should be selector
expectNoError(wait.Poll(5*time.Second, podStartTimeout, func() (bool, error) {
for start := time.Now(); time.Since(start) < podStartTimeout; time.Sleep(5 * time.Second) {
getPodsOutput := runKubectl("get", "pods", "-o", "template", getPodsTemplate, "--api-version=v1beta3", "-l", testname, fmt.Sprintf("--namespace=%v", ns))
pods := strings.Fields(getPodsOutput)
if numPods := len(pods); numPods != replicas {
By(fmt.Sprintf("Replicas for %s: expected=%d actual=%d", testname, replicas, numPods))
return false, nil
continue
}
var runningPods []string
for _, podID := range pods {
running := runKubectl("get", "pods", podID, "-o", "template", getContainerStateTemplate, "--api-version=v1beta3", fmt.Sprintf("--namespace=%v", ns))
if running != "true" {
Logf("%s is created but not running", podID)
return false, nil
continue
}
currentImage := runKubectl("get", "pods", podID, "-o", "template", getImageTemplate, "--api-version=v1beta3", fmt.Sprintf("--namespace=%v", ns))
if currentImage != containerImage {
Logf("%s is created but running wrong image; expected: %s, actual: %s", podID, containerImage, currentImage)
return false, nil
continue
}
// Call the generic validator function here.
// This might validate for example, that (1) getting a url works and (2) url is serving correct content.
if err := validator(c, podID); err != nil {
Logf("%s is running right image but validator function failed: %v", podID, err)
return false, nil
continue
}
Logf("%s is verified up and running", podID)
@ -306,10 +304,9 @@ func validateController(c *client.Client, containerImage string, replicas int, c
}
// If we reach here, then all our checks passed.
if len(runningPods) == replicas {
return true, nil
return
}
return false, nil
}))
}
// Reaching here means that one of more checks failed multiple times. Assuming its not a race condition, something is broken.
Failf("Timed out after %v seconds waiting for %s pods to reach valid state", podStartTimeout.Seconds(), testname)
}
@ -390,9 +387,10 @@ func testContainerOutputInNamespace(scenarioName string, c *client.Client, pod *
By(fmt.Sprintf("Trying to get logs from host %s pod %s container %s: %v",
podStatus.Spec.Host, podStatus.Name, containerName, err))
var logs []byte
start := time.Now()
// Sometimes the actual containers take a second to get started, try to get logs for 60s
expectNoError(wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
for time.Now().Sub(start) < (60 * time.Second) {
logs, err = c.Get().
Prefix("proxy").
Resource("nodes").
@ -400,14 +398,16 @@ func testContainerOutputInNamespace(scenarioName string, c *client.Client, pod *
Suffix("containerLogs", ns, podStatus.Name, containerName).
Do().
Raw()
Logf("pod logs:%v\n", string(logs))
fmt.Sprintf("pod logs:%v\n", string(logs))
By(fmt.Sprintf("pod logs:%v\n", string(logs)))
if strings.Contains(string(logs), "Internal Error") {
Logf("Failed to get logs from host %q pod %q container %q: %v",
podStatus.Spec.Host, podStatus.Name, containerName, string(logs))
return false, nil
By(fmt.Sprintf("Failed to get logs from host %q pod %q container %q: %v",
podStatus.Spec.Host, podStatus.Name, containerName, string(logs)))
time.Sleep(5 * time.Second)
continue
}
return true, nil
}))
break
}
for _, m := range expectedOutput {
Expect(string(logs)).To(ContainSubstring(m), "%q in container output", m)