improve retry logic with standard wait.Poll()

Signed-off-by: He Simei <hesimei@zju.edu.cn>
pull/6/head
He Simei 2015-05-18 08:40:18 +08:00
parent 10339d72b6
commit 09fc2a5013
10 changed files with 123 additions and 119 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -72,12 +73,14 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
// being run as the first e2e test just after the e2e cluster has been created.
var err error
const graceTime = 10 * time.Minute
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
start := time.Now()
expectNoError(wait.Poll(5*time.Second, graceTime, func() (bool, error) {
if _, err = s.Get("elasticsearch-logging"); err == nil {
break
return true, nil
}
Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
}
return false, nil
}))
Expect(err).NotTo(HaveOccurred())
// Wait for the Elasticsearch pods to enter the running state.
@ -95,7 +98,8 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
var statusCode float64
var esResponse map[string]interface{}
err = nil
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
start = time.Now()
expectNoError(wait.Poll(5*time.Second, graceTime, func() (bool, error) {
// Query against the root URL for Elasticsearch.
body, err := c.Get().
Namespace(api.NamespaceDefault).
@ -105,26 +109,26 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
DoRaw()
if err != nil {
Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
continue
return false, nil
}
esResponse, err = bodyToJSON(body)
if err != nil {
Logf("After %v failed to convert Elasticsearch JSON response %v to map[string]interface{}: %v", time.Since(start), string(body), err)
continue
return false, nil
}
statusIntf, ok := esResponse["status"]
if !ok {
Logf("After %v Elasticsearch response has no status field: %v", time.Since(start), esResponse)
continue
return false, nil
}
statusCode, ok = statusIntf.(float64)
if !ok {
// Assume this is a string returning Failure. Retry.
Logf("After %v expected status to be a float64 but got %v of type %T", time.Since(start), statusIntf, statusIntf)
continue
}
break
return false, nil
}
return true, nil
}))
Expect(err).NotTo(HaveOccurred())
if int(statusCode) != 200 {
Failf("Elasticsearch cluster has a bad status: %v", statusCode)
@ -233,7 +237,8 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
By("Checking all the log lines were ingested into Elasticsearch")
missing := 0
expected := nodeCount * countTo
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(10 * time.Second) {
start = time.Now()
expectNoError(wait.Poll(10*time.Second, graceTime, func() (bool, error) {
// Ask Elasticsearch to return all the log lines that were tagged with the underscore
// verison of the name. Ask for twice as many log lines as we expect to check for
// duplication bugs.
@ -248,13 +253,13 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
DoRaw()
if err != nil {
Logf("After %v failed to make proxy call to elasticsearch-logging: %v", time.Since(start), err)
continue
return false, nil
}
response, err := bodyToJSON(body)
if err != nil {
Logf("After %v failed to unmarshal response: %v", time.Since(start), err)
continue
return false, nil
}
hits, ok := response["hits"].(map[string]interface{})
if !ok {
@ -263,17 +268,17 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
totalF, ok := hits["total"].(float64)
if !ok {
Logf("After %v hits[total] not of the expected type: %T", time.Since(start), hits["total"])
continue
return false, nil
}
total := int(totalF)
if total < expected {
Logf("After %v expecting to find %d log lines but saw only %d", time.Since(start), expected, total)
continue
return false, nil
}
h, ok := hits["hits"].([]interface{})
if !ok {
Logf("After %v hits not of the expected type: %T", time.Since(start), hits["hits"])
continue
return false, nil
}
// Initialize data-structure for observing counts.
observed := make([][]int, nodeCount)
@ -329,10 +334,10 @@ func ClusterLevelLoggingWithElasticsearch(c *client.Client) {
}
if missing != 0 {
Logf("After %v still missing %d log lines", time.Since(start), missing)
continue
return false, nil
}
Logf("After %s found all %d log lines", time.Since(start), expected)
return
}
return true, nil
}))
Failf("Failed to find all %d log lines", expected)
}

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -144,13 +145,14 @@ func validateGuestbookApp(c *client.Client, ns string) {
// Returns whether received expected response from guestbook on time.
func waitForGuestbookResponse(c *client.Client, cmd, arg, expectedResponse string, timeout time.Duration, ns string) bool {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) {
expectNoError(wait.Poll(5*time.Second, timeout, func() (bool, error) {
res, err := makeRequestToGuestbook(c, cmd, arg, ns)
if err == nil && res == expectedResponse {
return true, nil
}
return false, nil
}))
return true
}
}
return false
}
func makeRequestToGuestbook(c *client.Client, cmd, value string, ns string) (string, error) {

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -118,7 +119,7 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int)
defer wg.Done()
rcExist := false
// Once every 1-2 minutes perform resize of RC.
for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) {
expectNoError(wait.Poll(time.Duration(60+rand.Intn(60))*time.Second, simulationTime, func() (bool, error) {
if !rcExist {
expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s", name, ns))
rcExist = true
@ -131,7 +132,8 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int)
expectNoError(DeleteRC(c, ns, name), fmt.Sprintf("deleting rc %s in namespace %s", name, ns))
rcExist = false
}
}
return false, nil
}))
if rcExist {
expectNoError(DeleteRC(c, ns, name), fmt.Sprintf("deleting rc %s in namespace %s after test completion", name, ns))
}

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
influxdb "github.com/influxdb/influxdb/client"
. "github.com/onsi/ginkgo"
@ -223,15 +224,12 @@ func testMonitoringUsingHeapsterInfluxdb(c *client.Client) {
expectedNodes, err := getAllNodesInCluster(c)
expectNoError(err)
startTime := time.Now()
for {
expectNoError(wait.Poll(sleepBetweenAttempts, testTimeout, func() (bool, error) {
if validatePodsAndNodes(influxdbClient, expectedPods, expectedNodes) {
return
}
if time.Since(startTime) >= testTimeout {
break
}
time.Sleep(sleepBetweenAttempts)
return true, nil
}
return false, nil
}))
Failf("monitoring using heapster and influxdb test failed")
}

View File

@ -29,6 +29,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
@ -108,14 +110,14 @@ var _ = Describe("PD", func() {
expectNoError(podClient.Delete(host1Pod.Name, nil), "Failed to delete host1Pod")
By(fmt.Sprintf("deleting PD %q", diskName))
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
expectNoError(wait.Poll(5*time.Second, 180*time.Second, func() (bool, error) {
if err = deletePD(diskName); err != nil {
Logf("Couldn't delete PD. Sleeping 5 seconds (%v)", err)
continue
return false, nil
}
Logf("Deleted PD %v", diskName)
break
}
return true, nil
}))
expectNoError(err, "Error deleting PD")
return
@ -176,13 +178,14 @@ var _ = Describe("PD", func() {
expectNoError(podClient.Delete(host1ROPod.Name, nil), "Failed to delete host1ROPod")
By(fmt.Sprintf("deleting PD %q", diskName))
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
expectNoError(wait.Poll(5*time.Second, 180*time.Second, func() (bool, error) {
if err = deletePD(diskName); err != nil {
Logf("Couldn't delete PD. Sleeping 5 seconds")
continue
}
break
return false, nil
}
return true, nil
}))
expectNoError(err, "Error deleting PD")
})
})

View File

@ -98,22 +98,21 @@ func testHostIP(c *client.Client, pod *api.Pod) {
err = waitForPodRunningInNamespace(c, pod.Name, ns)
Expect(err).NotTo(HaveOccurred())
// Try to make sure we get a hostIP for each pod.
hostIPTimeout := 2 * time.Minute
t := time.Now()
for {
p, err := podClient.Get(pod.Name)
var (
hostIPTimeout = 2 * time.Minute
pods *api.Pod
)
expectNoError(wait.Poll(5*time.Second, hostIPTimeout, func() (bool, error) {
pods, err = podClient.Get(pod.Name)
Expect(err).NotTo(HaveOccurred())
if p.Status.HostIP != "" {
Logf("Pod %s has hostIP: %s", p.Name, p.Status.HostIP)
break
}
if time.Since(t) >= hostIPTimeout {
Failf("Gave up waiting for hostIP of pod %s after %v seconds",
p.Name, time.Since(t).Seconds())
}
Logf("Retrying to get the hostIP of pod %s", p.Name)
time.Sleep(5 * time.Second)
if pods.Status.HostIP != "" {
Logf("Pod %s has hostIP: %s", pods.Name, pods.Status.HostIP)
return true, nil
}
Logf("Retrying to get the hostIP of pod %s", pods.Name)
return false, nil
}))
}
var _ = Describe("Pods", func() {

View File

@ -111,22 +111,16 @@ func ServeImageOrFail(c *client.Client, test string, image string) {
// List the pods, making sure we observe all the replicas.
listTimeout := time.Minute
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
pods, err := c.Pods(ns).List(label, fields.Everything())
Expect(err).NotTo(HaveOccurred())
t := time.Now()
for {
var pods *api.PodList
expectNoError(wait.Poll(5*time.Second, listTimeout, func() (bool, error) {
pods, err = c.Pods(ns).List(label, fields.Everything())
Logf("Controller %s: Found %d pods out of %d", name, len(pods.Items), replicas)
if len(pods.Items) == replicas {
break
}
if time.Since(t) > listTimeout {
Failf("Controller %s: Gave up waiting for %d pods to come up after seeing only %d pods after %v seconds",
name, replicas, len(pods.Items), time.Since(t).Seconds())
}
time.Sleep(5 * time.Second)
pods, err = c.Pods(ns).List(label, fields.Everything())
Expect(err).NotTo(HaveOccurred())
return true, nil
}
return false, nil
}))
By("Ensuring each pod is running")

View File

@ -24,6 +24,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -255,11 +256,11 @@ func waitForNodeToBeNotReady(c *client.Client, name string, timeout time.Duratio
// ready or unknown).
func waitForNodeToBe(c *client.Client, name string, wantReady bool, timeout time.Duration) bool {
Logf("Waiting up to %v for node %s readiness to be %t", timeout, name, wantReady)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
expectNoError(wait.Poll(poll, timeout, func() (bool, error) {
node, err := c.Nodes().Get(name)
if err != nil {
Logf("Couldn't get node %s", name)
continue
return false, nil
}
// Check the node readiness condition (logging all).
@ -270,10 +271,11 @@ func waitForNodeToBe(c *client.Client, name string, wantReady bool, timeout time
// matches as desired.
if cond.Type == api.NodeReady && (cond.Status == api.ConditionTrue) == wantReady {
Logf("Successfully found node %s readiness to be %t", name, wantReady)
return true, nil
}
}
return false, nil
}))
return true
}
}
}
Logf("Node %s didn't reach desired readiness (%t) within %v", name, wantReady, timeout)
return false
}

View File

@ -28,6 +28,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -82,7 +83,7 @@ var _ = Describe("Services", func() {
Expect(foundRO).To(Equal(true))
})
It("should serve a basic endpoint from pods", func(done Done) {
It("should serve a basic endpoint from pods", func() {
serviceName := "endpoint-test2"
ns := namespaces[0]
labels := map[string]string{
@ -144,13 +145,9 @@ var _ = Describe("Services", func() {
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{})
// We deferred Gingko pieces that may Fail, we aren't done.
defer func() {
close(done)
}()
}, 240.0)
})
It("should serve multiport endpoints from pods", func(done Done) {
It("should serve multiport endpoints from pods", func() {
// repacking functionality is intentionally not tested here - it's better to test it in an integration test.
serviceName := "multi-endpoint-test"
ns := namespaces[0]
@ -245,11 +242,7 @@ var _ = Describe("Services", func() {
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{})
// We deferred Gingko pieces that may Fail, we aren't done.
defer func() {
close(done)
}()
}, 240.0)
})
It("should be able to create a functioning external load balancer", func() {
if !providerIs("gce", "gke") {
@ -328,13 +321,13 @@ var _ = Describe("Services", func() {
By("hitting the pod through the service's external load balancer")
var resp *http.Response
for t := time.Now(); time.Since(t) < podStartTimeout; time.Sleep(5 * time.Second) {
expectNoError(wait.Poll(5*time.Second, podStartTimeout, func() (bool, error) {
resp, err = http.Get(fmt.Sprintf("http://%s:%d", ip, port))
if err == nil {
break
return true, nil
}
}
Expect(err).NotTo(HaveOccurred())
return false, nil
}))
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
@ -400,16 +393,21 @@ func waitForPublicIPs(c *client.Client, serviceName, namespace string) (*api.Ser
const timeout = 4 * time.Minute
var service *api.Service
By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to have a public IP", timeout, serviceName, namespace))
for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) {
start := time.Now()
expectNoError(wait.Poll(5*time.Second, timeout, func() (bool, error) {
service, err := c.Services(namespace).Get(serviceName)
if err != nil {
Logf("Get service failed, ignoring for 5s: %v", err)
continue
return false, nil
}
if len(service.Spec.PublicIPs) > 0 {
return service, nil
return true, nil
}
Logf("Waiting for service %s in namespace %s to have a public IP (%v)", serviceName, namespace, time.Since(start))
return false, nil
}))
if len(service.Spec.PublicIPs) > 0 {
return service, nil
}
return service, fmt.Errorf("service %s in namespace %s doesn't have a public IP after %.2f seconds", serviceName, namespace, timeout.Seconds())
}
@ -481,27 +479,28 @@ func validatePortsOrFail(endpoints map[string][]int, expectedEndpoints map[strin
func validateEndpointsOrFail(c *client.Client, ns, serviceName string, expectedEndpoints map[string][]int) {
By(fmt.Sprintf("Validating endpoints %v with on service %s/%s", expectedEndpoints, ns, serviceName))
for {
expectNoError(wait.Poll(time.Second, 120*time.Second, func() (bool, error) {
endpoints, err := c.Endpoints(ns).Get(serviceName)
if err == nil {
By(fmt.Sprintf("Found endpoints %v", endpoints))
Logf("Found endpoints %v", endpoints)
portsByIp := getPortsByIp(endpoints.Subsets)
By(fmt.Sprintf("Found ports by ip %v", portsByIp))
Logf("Found ports by ip %v", portsByIp)
if len(portsByIp) == len(expectedEndpoints) {
expectedPortsByIp := translatePodNameToIpOrFail(c, ns, expectedEndpoints)
validatePortsOrFail(portsByIp, expectedPortsByIp)
break
return true, nil
} else {
By(fmt.Sprintf("Unexpected number of endpoints: found %v, expected %v (ignoring for 1 second)", portsByIp, expectedEndpoints))
Logf("Unexpected number of endpoints: found %v, expected %v (ignoring for 1 second)", portsByIp, expectedEndpoints)
}
} else {
By(fmt.Sprintf("Failed to get endpoints: %v (ignoring for 1 second)", err))
Logf("Failed to get endpoints: %v (ignoring for 1 second)", err)
}
time.Sleep(time.Second)
}
By(fmt.Sprintf("successfully validated endpoints %v with on service %s/%s", expectedEndpoints, ns, serviceName))
return false, nil
}))
Logf("successfully validated endpoints %v with on service %s/%s", expectedEndpoints, ns, serviceName)
}
func addEndpointPodOrFail(c *client.Client, ns, name string, labels map[string]string, containerPorts []api.ContainerPort) {

View File

@ -38,6 +38,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"golang.org/x/crypto/ssh"
@ -262,32 +263,33 @@ func validateController(c *client.Client, containerImage string, replicas int, c
getImageTemplate := fmt.Sprintf(`--template={{if (exists . "status" "containerStatuses")}}{{range .status.containerStatuses}}{{if eq .name "%s"}}{{.image}}{{end}}{{end}}{{end}}`, containername)
By(fmt.Sprintf("waiting for all containers in %s pods to come up.", testname)) //testname should be selector
for start := time.Now(); time.Since(start) < podStartTimeout; time.Sleep(5 * time.Second) {
expectNoError(wait.Poll(5*time.Second, podStartTimeout, func() (bool, error) {
getPodsOutput := runKubectl("get", "pods", "-o", "template", getPodsTemplate, "--api-version=v1beta3", "-l", testname, fmt.Sprintf("--namespace=%v", ns))
pods := strings.Fields(getPodsOutput)
if numPods := len(pods); numPods != replicas {
By(fmt.Sprintf("Replicas for %s: expected=%d actual=%d", testname, replicas, numPods))
continue
return false, nil
}
var runningPods []string
for _, podID := range pods {
running := runKubectl("get", "pods", podID, "-o", "template", getContainerStateTemplate, "--api-version=v1beta3", fmt.Sprintf("--namespace=%v", ns))
if running != "true" {
Logf("%s is created but not running", podID)
continue
return false, nil
}
currentImage := runKubectl("get", "pods", podID, "-o", "template", getImageTemplate, "--api-version=v1beta3", fmt.Sprintf("--namespace=%v", ns))
if currentImage != containerImage {
Logf("%s is created but running wrong image; expected: %s, actual: %s", podID, containerImage, currentImage)
continue
return false, nil
}
// Call the generic validator function here.
// This might validate for example, that (1) getting a url works and (2) url is serving correct content.
if err := validator(c, podID); err != nil {
Logf("%s is running right image but validator function failed: %v", podID, err)
continue
return false, nil
}
Logf("%s is verified up and running", podID)
@ -295,9 +297,10 @@ func validateController(c *client.Client, containerImage string, replicas int, c
}
// If we reach here, then all our checks passed.
if len(runningPods) == replicas {
return
}
return true, nil
}
return false, nil
}))
// Reaching here means that one of more checks failed multiple times. Assuming its not a race condition, something is broken.
Failf("Timed out after %v seconds waiting for %s pods to reach valid state", podStartTimeout.Seconds(), testname)
}
@ -378,10 +381,9 @@ func testContainerOutputInNamespace(scenarioName string, c *client.Client, pod *
By(fmt.Sprintf("Trying to get logs from host %s pod %s container %s: %v",
podStatus.Spec.Host, podStatus.Name, containerName, err))
var logs []byte
start := time.Now()
// Sometimes the actual containers take a second to get started, try to get logs for 60s
for time.Now().Sub(start) < (60 * time.Second) {
expectNoError(wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
logs, err = c.Get().
Prefix("proxy").
Resource("nodes").
@ -389,16 +391,14 @@ func testContainerOutputInNamespace(scenarioName string, c *client.Client, pod *
Suffix("containerLogs", ns, podStatus.Name, containerName).
Do().
Raw()
fmt.Sprintf("pod logs:%v\n", string(logs))
By(fmt.Sprintf("pod logs:%v\n", string(logs)))
Logf("pod logs:%v\n", string(logs))
if strings.Contains(string(logs), "Internal Error") {
By(fmt.Sprintf("Failed to get logs from host %q pod %q container %q: %v",
podStatus.Spec.Host, podStatus.Name, containerName, string(logs)))
time.Sleep(5 * time.Second)
continue
}
break
Logf("Failed to get logs from host %q pod %q container %q: %v",
podStatus.Spec.Host, podStatus.Name, containerName, string(logs))
return false, nil
}
return true, nil
}))
for _, m := range expectedOutput {
Expect(string(logs)).To(ContainSubstring(m), "%q in container output", m)