diff --git a/test/integration/etcd/BUILD b/test/integration/etcd/BUILD index cc5c8c5d2d..b695213754 100644 --- a/test/integration/etcd/BUILD +++ b/test/integration/etcd/BUILD @@ -20,12 +20,12 @@ go_test( "//cmd/kube-apiserver/app:go_default_library", "//cmd/kube-apiserver/app/options:go_default_library", "//pkg/api/legacyscheme:go_default_library", - "//pkg/apis/core:go_default_library", "//pkg/master:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", @@ -35,10 +35,10 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/client-go/discovery/cached:go_default_library", + "//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/restmapper:go_default_library", - "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", "//test/integration:go_default_library", "//test/integration/framework:go_default_library", "//vendor/github.com/coreos/etcd/clientv3:go_default_library", diff --git a/test/integration/etcd/etcd_storage_path_test.go b/test/integration/etcd/etcd_storage_path_test.go index 87f7a7563b..c5cc12c227 100644 --- a/test/integration/etcd/etcd_storage_path_test.go +++ b/test/integration/etcd/etcd_storage_path_test.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "mime" "net" "net/http" "os" @@ -35,6 +34,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" @@ -44,13 +44,13 @@ import ( genericapiserveroptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/storage/storagebackend" cacheddiscovery "k8s.io/client-go/discovery/cached" + "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" - "k8s.io/client-go/util/flowcontrol" + "k8s.io/client-go/restmapper" "k8s.io/kubernetes/cmd/kube-apiserver/app" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/api/legacyscheme" - kapi "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/test/integration" "k8s.io/kubernetes/test/integration/framework" @@ -58,7 +58,6 @@ import ( _ "k8s.io/kubernetes/pkg/master" // TODO what else is needed "github.com/coreos/etcd/clientv3" - "k8s.io/client-go/restmapper" ) // Etcd data for all persisted objects. @@ -458,23 +457,33 @@ func TestEtcdStoragePath(t *testing.T) { certDir, _ := ioutil.TempDir("", "test-integration-etcd") defer os.RemoveAll(certDir) - client, kvClient, mapper := startRealMasterOrDie(t, certDir) + clientConfig, kvClient := startRealMasterOrDie(t, certDir) defer func() { dumpEtcdKVOnFailure(t, kvClient) }() + client := &allClient{dynamicClient: dynamic.NewForConfigOrDie(clientConfig)} + kubeClient := clientset.NewForConfigOrDie(clientConfig) + if _, err := kubeClient.CoreV1().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}); err != nil { + t.Fatal(err) + } + + discoveryClient := cacheddiscovery.NewMemCacheClient(kubeClient.Discovery()) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + restMapper.Reset() + + resourcesToPersist := []resourceToPersist{} + serverResources, err := kubeClient.Discovery().ServerResources() + if err != nil { + t.Fatal(err) + } + resourcesToPersist = append(resourcesToPersist, getResourcesToPersist(serverResources, false, t)...) + kindSeen := sets.NewString() pathSeen := map[string][]schema.GroupVersionResource{} etcdSeen := map[schema.GroupVersionResource]empty{} cohabitatingResources := map[string]map[schema.GroupVersionKind]empty{} - resourcesToPersist := []resourceToPersist{} - serverResources, err := clientset.NewForConfigOrDie(client.config).Discovery().ServerResources() - if err != nil { - t.Fatal(err) - } - resourcesToPersist = append(resourcesToPersist, getResourcesToPersist(serverResources, false, t)...) - for _, resourceToPersist := range resourcesToPersist { t.Run(resourceToPersist.gvr.String(), func(t *testing.T) { gvk := resourceToPersist.gvk @@ -524,7 +533,7 @@ func TestEtcdStoragePath(t *testing.T) { } }() - if err := client.createPrerequisites(mapper, testNamespace, testData.prerequisites, all); err != nil { + if err := client.createPrerequisites(restMapper, testNamespace, testData.prerequisites, all); err != nil { t.Fatalf("failed to create prerequisites for %s: %#v", gvResource, err) } @@ -589,7 +598,7 @@ func TestEtcdStoragePath(t *testing.T) { } } -func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV, meta.RESTMapper) { +func startRealMasterOrDie(t *testing.T, certDir string) (*restclient.Config, clientv3.KV) { _, defaultServiceClusterIPRange, err := net.ParseCIDR("10.0.0.0/24") if err != nil { t.Fatal(err) @@ -679,29 +688,16 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV storageConfig := storageConfigValue.Load().(storagebackend.Config) kubeClientConfig := restclient.CopyConfig(kubeClientConfigValue.Load().(*restclient.Config)) + // this test makes lots of requests, don't be slow kubeClientConfig.QPS = 99999 kubeClientConfig.Burst = 9999 - kubeClient := clientset.NewForConfigOrDie(kubeClientConfig) - if _, err := kubeClient.CoreV1().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}); err != nil { - t.Fatal(err) - } - - client, err := newClient(*kubeClientConfig) - if err != nil { - t.Fatal(err) - } - kvClient, err := integration.GetEtcdKVClient(storageConfig) if err != nil { t.Fatal(err) } - discoveryClient := cacheddiscovery.NewMemCacheClient(kubeClient.Discovery()) - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) - restMapper.Reset() - - return client, kvClient, restMapper + return kubeClientConfig, kvClient } func dumpEtcdKVOnFailure(t *testing.T, kvClient clientv3.KV) { @@ -768,8 +764,8 @@ type prerequisite struct { type empty struct{} type cleanupData struct { - obj runtime.Object - mapping *meta.RESTMapping + obj *unstructured.Unstructured + resource schema.GroupVersionResource } func gvr(g, v, r string) schema.GroupVersionResource { @@ -803,67 +799,38 @@ func keyStringer(i interface{}) string { } type allClient struct { - client *http.Client - config *restclient.Config - backoff restclient.BackoffManager -} - -func (c *allClient) verb(verb string, gvk schema.GroupVersionKind) (*restclient.Request, error) { - apiPath := "/apis" - if gvk.Group == kapi.GroupName { - apiPath = "/api" - } - baseURL, versionedAPIPath, err := restclient.DefaultServerURL(c.config.Host, apiPath, gvk.GroupVersion(), true) - if err != nil { - return nil, err - } - contentConfig := c.config.ContentConfig - gv := gvk.GroupVersion() - contentConfig.GroupVersion = &gv - serializers, err := createSerializers(contentConfig) - if err != nil { - return nil, err - } - return restclient.NewRequest(c.client, verb, baseURL, versionedAPIPath, contentConfig, *serializers, c.backoff, c.config.RateLimiter, 0), nil + dynamicClient dynamic.Interface } func (c *allClient) create(stub, ns string, mapping *meta.RESTMapping, all *[]cleanupData) error { - req, err := c.verb("POST", mapping.GroupVersionKind) + // we don't require GVK on the data we provide, so we fill it in here. We could, but that seems extraneous. + typeMetaAdder := map[string]interface{}{} + err := json.Unmarshal([]byte(stub), &typeMetaAdder) if err != nil { return err } - namespaced := mapping.Scope.Name() == meta.RESTScopeNameNamespace - output, err := req.NamespaceIfScoped(ns, namespaced).Resource(mapping.Resource.Resource).Body(strings.NewReader(stub)).Do().Get() - if err != nil { - return err - } - *all = append(*all, cleanupData{output, mapping}) - return nil -} + typeMetaAdder["apiVersion"] = mapping.GroupVersionKind.GroupVersion().String() + typeMetaAdder["kind"] = mapping.GroupVersionKind.Kind -func (c *allClient) destroy(obj runtime.Object, mapping *meta.RESTMapping) error { - req, err := c.verb("DELETE", mapping.GroupVersionKind) + if mapping.Scope == meta.RESTScopeRoot { + ns = "" + } + obj := &unstructured.Unstructured{Object: typeMetaAdder} + actual, err := c.dynamicClient.Resource(mapping.Resource).Namespace(ns).Create(obj) if err != nil { return err } - namespaced := mapping.Scope.Name() == meta.RESTScopeNameNamespace - name, err := meta.NewAccessor().Name(obj) - if err != nil { - return err - } - ns, err := meta.NewAccessor().Namespace(obj) - if err != nil { - return err - } - return req.NamespaceIfScoped(ns, namespaced).Resource(mapping.Resource.Resource).Name(name).Do().Error() + + *all = append(*all, cleanupData{actual, mapping.Resource}) + return nil } func (c *allClient) cleanup(all *[]cleanupData) error { for i := len(*all) - 1; i >= 0; i-- { // delete in reverse order in case creation order mattered obj := (*all)[i].obj - mapping := (*all)[i].mapping + gvr := (*all)[i].resource - if err := c.destroy(obj, mapping); err != nil { + if err := c.dynamicClient.Resource(gvr).Namespace(obj.GetNamespace()).Delete(obj.GetName(), nil); err != nil { return err } } @@ -887,81 +854,6 @@ func (c *allClient) createPrerequisites(mapper meta.RESTMapper, ns string, prere return nil } -func newClient(config restclient.Config) (*allClient, error) { - config.ContentConfig.NegotiatedSerializer = legacyscheme.Codecs - config.ContentConfig.ContentType = "application/json" - config.Timeout = 30 * time.Second - config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(3, 10) - - transport, err := restclient.TransportFor(&config) - if err != nil { - return nil, err - } - - client := &http.Client{ - Transport: transport, - Timeout: config.Timeout, - } - - backoff := &restclient.URLBackoff{ - Backoff: flowcontrol.NewBackOff(1*time.Second, 10*time.Second), - } - - return &allClient{ - client: client, - config: &config, - backoff: backoff, - }, nil -} - -// copied from restclient -func createSerializers(config restclient.ContentConfig) (*restclient.Serializers, error) { - mediaTypes := config.NegotiatedSerializer.SupportedMediaTypes() - contentType := config.ContentType - mediaType, _, err := mime.ParseMediaType(contentType) - if err != nil { - return nil, fmt.Errorf("the content type specified in the client configuration is not recognized: %v", err) - } - info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType) - if !ok { - if len(contentType) != 0 || len(mediaTypes) == 0 { - return nil, fmt.Errorf("no serializers registered for %s", contentType) - } - info = mediaTypes[0] - } - - internalGV := schema.GroupVersions{ - { - Group: config.GroupVersion.Group, - Version: runtime.APIVersionInternal, - }, - // always include the legacy group as a decoding target to handle non-error `Status` return types - { - Group: "", - Version: runtime.APIVersionInternal, - }, - } - - s := &restclient.Serializers{ - Encoder: config.NegotiatedSerializer.EncoderForVersion(info.Serializer, *config.GroupVersion), - Decoder: config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), - - RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) { - info, ok := runtime.SerializerInfoForMediaType(mediaTypes, contentType) - if !ok { - return nil, fmt.Errorf("serializer for %s not registered", contentType) - } - return config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil - }, - } - if info.StreamSerializer != nil { - s.StreamingSerializer = info.StreamSerializer.Serializer - s.Framer = info.StreamSerializer.Framer - } - - return s, nil -} - func getFromEtcd(keys clientv3.KV, path string) (*metaObject, error) { response, err := keys.Get(context.Background(), path) if err != nil { @@ -1045,7 +937,6 @@ func getResourcesToPersist(serverResources []*metav1.APIResourceList, isOAPI boo } } gvr := resourceGV.WithResource(discoveryResource.Name) - legacyscheme.Scheme.New(gvk) resourcesToPersist = append(resourcesToPersist, resourceToPersist{ gvk: gvk,