Merge pull request #76388 from sttts/sttts-crd-openapi-publishing-e2e-ha-support

crd-openapi-publishing: in e2e query apiserver instances for HA
k3s-v1.15.3
Kubernetes Prow Robot 2019-04-10 10:08:42 -07:00 committed by GitHub
commit c2254cf19a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 73 additions and 28 deletions

View File

@ -71,6 +71,7 @@ go_library(
"//staging/src/k8s.io/client-go/discovery:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
"//staging/src/k8s.io/client-go/util/keyutil:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",

View File

@ -19,6 +19,8 @@ package apimachinery
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strings"
"time"
@ -35,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
utilyaml "k8s.io/apimachinery/pkg/util/yaml"
k8sclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
openapiutil "k8s.io/kube-openapi/pkg/util"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/utils/crd"
@ -389,6 +392,23 @@ func patchSchema(schema []byte, crd *crd.TestCrd) error {
return err
}
const waitSuccessThreshold = 10
// mustSucceedMultipleTimes calls f multiple times on success and only returns true if all calls are successful.
// This is necessary to avoid flaking tests where one call might hit a good apiserver while in HA other apiservers
// might be lagging behind. Calling f multiple times reduces the chance exponentially.
func mustSucceedMultipleTimes(n int, f func() (bool, error)) func() (bool, error) {
return func() (bool, error) {
for i := 0; i < n; i++ {
ok, err := f()
if err != nil || !ok {
return ok, err
}
}
return true, nil
}
}
// waitForDefinition waits for given definition showing up in swagger with given schema
func waitForDefinition(c k8sclientset.Interface, name string, schema []byte) error {
expect := spec.Schema{}
@ -396,54 +416,78 @@ func waitForDefinition(c k8sclientset.Interface, name string, schema []byte) err
return err
}
lastMsg := ""
if err := wait.Poll(500*time.Millisecond, 10*time.Second, func() (bool, error) {
bs, err := c.CoreV1().RESTClient().Get().AbsPath("openapi", "v2").DoRaw()
if err != nil {
return false, err
}
spec := spec.Swagger{}
if err := json.Unmarshal(bs, &spec); err != nil {
return false, err
}
err := waitForOpenAPISchema(c, func(spec *spec.Swagger) (bool, string) {
d, ok := spec.SwaggerProps.Definitions[name]
if !ok {
lastMsg = fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] not found", name)
return false, nil
return false, fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] not found", name)
}
// drop properties and extension that we added
dropDefaults(&d)
if !apiequality.Semantic.DeepEqual(expect, d) {
lastMsg = fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] not match; expect: %v, actual: %v", name, expect, d)
return false, nil
return false, fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] not match; expect: %v, actual: %v", name, expect, d)
}
return true, nil
}); err != nil {
return fmt.Errorf("failed to wait for definition %s to be served: %v; lastMsg: %s", name, err, lastMsg)
return true, ""
})
if err != nil {
return fmt.Errorf("failed to wait for definition %q to be served with the right OpenAPI schema: %v", name, err)
}
return nil
}
// waitForDefinitionCleanup waits for given definition to be removed from swagger
func waitForDefinitionCleanup(c k8sclientset.Interface, name string) error {
err := waitForOpenAPISchema(c, func(spec *spec.Swagger) (bool, string) {
if _, ok := spec.SwaggerProps.Definitions[name]; ok {
return false, fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] still exists", name)
}
return true, ""
})
if err != nil {
return fmt.Errorf("failed to wait for definition %q not to be served anymore: %v", name, err)
}
return nil
}
func waitForOpenAPISchema(c k8sclientset.Interface, pred func(*spec.Swagger) (bool, string)) error {
client := c.CoreV1().RESTClient().(*rest.RESTClient).Client
url := c.CoreV1().RESTClient().Get().AbsPath("openapi", "v2").URL()
lastMsg := ""
if err := wait.Poll(500*time.Millisecond, 10*time.Second, func() (bool, error) {
bs, err := c.CoreV1().RESTClient().Get().AbsPath("openapi", "v2").DoRaw()
etag := ""
var etagSpec *spec.Swagger
if err := wait.Poll(500*time.Millisecond, wait.ForeverTestTimeout, mustSucceedMultipleTimes(waitSuccessThreshold, func() (bool, error) {
// download spec with etag support
spec := &spec.Swagger{}
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return false, err
}
spec := spec.Swagger{}
if err := json.Unmarshal(bs, &spec); err != nil {
req.Close = true // enforce a new connection to hit different HA API servers
if len(etag) > 0 {
req.Header.Set("If-None-Match", fmt.Sprintf(`"%s"`, etag))
}
resp, err := client.Do(req)
if err != nil {
return false, err
}
_, ok := spec.SwaggerProps.Definitions[name]
if ok {
lastMsg = fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] still exists", name)
return false, nil
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotModified {
spec = etagSpec
} else if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("unexpected response: %d", resp.StatusCode)
} else if bs, err := ioutil.ReadAll(resp.Body); err != nil {
return false, err
} else if err := json.Unmarshal(bs, spec); err != nil {
return false, err
} else {
etag = strings.Trim(resp.Header.Get("ETag"), `"`)
etagSpec = spec
}
return true, nil
}); err != nil {
return fmt.Errorf("failed to wait for definition %s to be removed: %v; lastMsg: %s", name, err, lastMsg)
var ok bool
ok, lastMsg = pred(spec)
return ok, nil
})); err != nil {
return fmt.Errorf("failed to wait for OpenAPI spec validating condition: %v; lastMsg: %s", err, lastMsg)
}
return nil
}