diff --git a/test/e2e/apimachinery/BUILD b/test/e2e/apimachinery/BUILD index c5063f0e80..e47be42555 100644 --- a/test/e2e/apimachinery/BUILD +++ b/test/e2e/apimachinery/BUILD @@ -71,6 +71,7 @@ go_library( "//staging/src/k8s.io/client-go/discovery:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/util/cert:go_default_library", "//staging/src/k8s.io/client-go/util/keyutil:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", diff --git a/test/e2e/apimachinery/crd_publish_openapi.go b/test/e2e/apimachinery/crd_publish_openapi.go index 068481efde..36aed720d6 100644 --- a/test/e2e/apimachinery/crd_publish_openapi.go +++ b/test/e2e/apimachinery/crd_publish_openapi.go @@ -19,6 +19,8 @@ package apimachinery import ( "encoding/json" "fmt" + "io/ioutil" + "net/http" "regexp" "strings" "time" @@ -35,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" utilyaml "k8s.io/apimachinery/pkg/util/yaml" k8sclientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" openapiutil "k8s.io/kube-openapi/pkg/util" "k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/utils/crd" @@ -389,6 +392,23 @@ func patchSchema(schema []byte, crd *crd.TestCrd) error { return err } +const waitSuccessThreshold = 10 + +// mustSucceedMultipleTimes calls f multiple times on success and only returns true if all calls are successful. +// This is necessary to avoid flaking tests where one call might hit a good apiserver while in HA other apiservers +// might be lagging behind. Calling f multiple times reduces the chance exponentially. +func mustSucceedMultipleTimes(n int, f func() (bool, error)) func() (bool, error) { + return func() (bool, error) { + for i := 0; i < n; i++ { + ok, err := f() + if err != nil || !ok { + return ok, err + } + } + return true, nil + } +} + // waitForDefinition waits for given definition showing up in swagger with given schema func waitForDefinition(c k8sclientset.Interface, name string, schema []byte) error { expect := spec.Schema{} @@ -396,54 +416,78 @@ func waitForDefinition(c k8sclientset.Interface, name string, schema []byte) err return err } - lastMsg := "" - if err := wait.Poll(500*time.Millisecond, 10*time.Second, func() (bool, error) { - bs, err := c.CoreV1().RESTClient().Get().AbsPath("openapi", "v2").DoRaw() - if err != nil { - return false, err - } - spec := spec.Swagger{} - if err := json.Unmarshal(bs, &spec); err != nil { - return false, err - } + err := waitForOpenAPISchema(c, func(spec *spec.Swagger) (bool, string) { d, ok := spec.SwaggerProps.Definitions[name] if !ok { - lastMsg = fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] not found", name) - return false, nil + return false, fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] not found", name) } // drop properties and extension that we added dropDefaults(&d) if !apiequality.Semantic.DeepEqual(expect, d) { - lastMsg = fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] not match; expect: %v, actual: %v", name, expect, d) - return false, nil + return false, fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] not match; expect: %v, actual: %v", name, expect, d) } - return true, nil - }); err != nil { - return fmt.Errorf("failed to wait for definition %s to be served: %v; lastMsg: %s", name, err, lastMsg) + return true, "" + }) + if err != nil { + return fmt.Errorf("failed to wait for definition %q to be served with the right OpenAPI schema: %v", name, err) } return nil } // waitForDefinitionCleanup waits for given definition to be removed from swagger func waitForDefinitionCleanup(c k8sclientset.Interface, name string) error { + err := waitForOpenAPISchema(c, func(spec *spec.Swagger) (bool, string) { + if _, ok := spec.SwaggerProps.Definitions[name]; ok { + return false, fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] still exists", name) + } + return true, "" + }) + if err != nil { + return fmt.Errorf("failed to wait for definition %q not to be served anymore: %v", name, err) + } + return nil +} + +func waitForOpenAPISchema(c k8sclientset.Interface, pred func(*spec.Swagger) (bool, string)) error { + client := c.CoreV1().RESTClient().(*rest.RESTClient).Client + url := c.CoreV1().RESTClient().Get().AbsPath("openapi", "v2").URL() lastMsg := "" - if err := wait.Poll(500*time.Millisecond, 10*time.Second, func() (bool, error) { - bs, err := c.CoreV1().RESTClient().Get().AbsPath("openapi", "v2").DoRaw() + etag := "" + var etagSpec *spec.Swagger + if err := wait.Poll(500*time.Millisecond, wait.ForeverTestTimeout, mustSucceedMultipleTimes(waitSuccessThreshold, func() (bool, error) { + // download spec with etag support + spec := &spec.Swagger{} + req, err := http.NewRequest("GET", url.String(), nil) if err != nil { return false, err } - spec := spec.Swagger{} - if err := json.Unmarshal(bs, &spec); err != nil { + req.Close = true // enforce a new connection to hit different HA API servers + if len(etag) > 0 { + req.Header.Set("If-None-Match", fmt.Sprintf(`"%s"`, etag)) + } + resp, err := client.Do(req) + if err != nil { return false, err } - _, ok := spec.SwaggerProps.Definitions[name] - if ok { - lastMsg = fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] still exists", name) - return false, nil + defer resp.Body.Close() + if resp.StatusCode == http.StatusNotModified { + spec = etagSpec + } else if resp.StatusCode != http.StatusOK { + return false, fmt.Errorf("unexpected response: %d", resp.StatusCode) + } else if bs, err := ioutil.ReadAll(resp.Body); err != nil { + return false, err + } else if err := json.Unmarshal(bs, spec); err != nil { + return false, err + } else { + etag = strings.Trim(resp.Header.Get("ETag"), `"`) + etagSpec = spec } - return true, nil - }); err != nil { - return fmt.Errorf("failed to wait for definition %s to be removed: %v; lastMsg: %s", name, err, lastMsg) + + var ok bool + ok, lastMsg = pred(spec) + return ok, nil + })); err != nil { + return fmt.Errorf("failed to wait for OpenAPI spec validating condition: %v; lastMsg: %s", err, lastMsg) } return nil }