switch etcstorage to use dynamic client

pull/8/head
David Eads 2018-08-01 08:30:10 -04:00
parent 29f68813d3
commit 3da02fb09b
2 changed files with 45 additions and 154 deletions

View File

@ -20,12 +20,12 @@ go_test(
"//cmd/kube-apiserver/app:go_default_library",
"//cmd/kube-apiserver/app/options:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/master:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
@ -35,10 +35,10 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/restmapper:go_default_library",
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
"//test/integration:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",

View File

@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"mime"
"net"
"net/http"
"os"
@ -35,6 +34,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
@ -44,13 +44,13 @@ import (
genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/restmapper"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
kapi "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework"
@ -58,7 +58,6 @@ import (
_ "k8s.io/kubernetes/pkg/master" // TODO what else is needed
"github.com/coreos/etcd/clientv3"
"k8s.io/client-go/restmapper"
)
// Etcd data for all persisted objects.
@ -458,23 +457,33 @@ func TestEtcdStoragePath(t *testing.T) {
certDir, _ := ioutil.TempDir("", "test-integration-etcd")
defer os.RemoveAll(certDir)
client, kvClient, mapper := startRealMasterOrDie(t, certDir)
clientConfig, kvClient := startRealMasterOrDie(t, certDir)
defer func() {
dumpEtcdKVOnFailure(t, kvClient)
}()
client := &allClient{dynamicClient: dynamic.NewForConfigOrDie(clientConfig)}
kubeClient := clientset.NewForConfigOrDie(clientConfig)
if _, err := kubeClient.CoreV1().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}); err != nil {
t.Fatal(err)
}
discoveryClient := cacheddiscovery.NewMemCacheClient(kubeClient.Discovery())
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
restMapper.Reset()
resourcesToPersist := []resourceToPersist{}
serverResources, err := kubeClient.Discovery().ServerResources()
if err != nil {
t.Fatal(err)
}
resourcesToPersist = append(resourcesToPersist, getResourcesToPersist(serverResources, false, t)...)
kindSeen := sets.NewString()
pathSeen := map[string][]schema.GroupVersionResource{}
etcdSeen := map[schema.GroupVersionResource]empty{}
cohabitatingResources := map[string]map[schema.GroupVersionKind]empty{}
resourcesToPersist := []resourceToPersist{}
serverResources, err := clientset.NewForConfigOrDie(client.config).Discovery().ServerResources()
if err != nil {
t.Fatal(err)
}
resourcesToPersist = append(resourcesToPersist, getResourcesToPersist(serverResources, false, t)...)
for _, resourceToPersist := range resourcesToPersist {
t.Run(resourceToPersist.gvr.String(), func(t *testing.T) {
gvk := resourceToPersist.gvk
@ -524,7 +533,7 @@ func TestEtcdStoragePath(t *testing.T) {
}
}()
if err := client.createPrerequisites(mapper, testNamespace, testData.prerequisites, all); err != nil {
if err := client.createPrerequisites(restMapper, testNamespace, testData.prerequisites, all); err != nil {
t.Fatalf("failed to create prerequisites for %s: %#v", gvResource, err)
}
@ -589,7 +598,7 @@ func TestEtcdStoragePath(t *testing.T) {
}
}
func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV, meta.RESTMapper) {
func startRealMasterOrDie(t *testing.T, certDir string) (*restclient.Config, clientv3.KV) {
_, defaultServiceClusterIPRange, err := net.ParseCIDR("10.0.0.0/24")
if err != nil {
t.Fatal(err)
@ -679,29 +688,16 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV
storageConfig := storageConfigValue.Load().(storagebackend.Config)
kubeClientConfig := restclient.CopyConfig(kubeClientConfigValue.Load().(*restclient.Config))
// this test makes lots of requests, don't be slow
kubeClientConfig.QPS = 99999
kubeClientConfig.Burst = 9999
kubeClient := clientset.NewForConfigOrDie(kubeClientConfig)
if _, err := kubeClient.CoreV1().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}); err != nil {
t.Fatal(err)
}
client, err := newClient(*kubeClientConfig)
if err != nil {
t.Fatal(err)
}
kvClient, err := integration.GetEtcdKVClient(storageConfig)
if err != nil {
t.Fatal(err)
}
discoveryClient := cacheddiscovery.NewMemCacheClient(kubeClient.Discovery())
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
restMapper.Reset()
return client, kvClient, restMapper
return kubeClientConfig, kvClient
}
func dumpEtcdKVOnFailure(t *testing.T, kvClient clientv3.KV) {
@ -768,8 +764,8 @@ type prerequisite struct {
type empty struct{}
type cleanupData struct {
obj runtime.Object
mapping *meta.RESTMapping
obj *unstructured.Unstructured
resource schema.GroupVersionResource
}
func gvr(g, v, r string) schema.GroupVersionResource {
@ -803,67 +799,38 @@ func keyStringer(i interface{}) string {
}
type allClient struct {
client *http.Client
config *restclient.Config
backoff restclient.BackoffManager
}
func (c *allClient) verb(verb string, gvk schema.GroupVersionKind) (*restclient.Request, error) {
apiPath := "/apis"
if gvk.Group == kapi.GroupName {
apiPath = "/api"
}
baseURL, versionedAPIPath, err := restclient.DefaultServerURL(c.config.Host, apiPath, gvk.GroupVersion(), true)
if err != nil {
return nil, err
}
contentConfig := c.config.ContentConfig
gv := gvk.GroupVersion()
contentConfig.GroupVersion = &gv
serializers, err := createSerializers(contentConfig)
if err != nil {
return nil, err
}
return restclient.NewRequest(c.client, verb, baseURL, versionedAPIPath, contentConfig, *serializers, c.backoff, c.config.RateLimiter, 0), nil
dynamicClient dynamic.Interface
}
func (c *allClient) create(stub, ns string, mapping *meta.RESTMapping, all *[]cleanupData) error {
req, err := c.verb("POST", mapping.GroupVersionKind)
// we don't require GVK on the data we provide, so we fill it in here. We could, but that seems extraneous.
typeMetaAdder := map[string]interface{}{}
err := json.Unmarshal([]byte(stub), &typeMetaAdder)
if err != nil {
return err
}
namespaced := mapping.Scope.Name() == meta.RESTScopeNameNamespace
output, err := req.NamespaceIfScoped(ns, namespaced).Resource(mapping.Resource.Resource).Body(strings.NewReader(stub)).Do().Get()
typeMetaAdder["apiVersion"] = mapping.GroupVersionKind.GroupVersion().String()
typeMetaAdder["kind"] = mapping.GroupVersionKind.Kind
if mapping.Scope == meta.RESTScopeRoot {
ns = ""
}
obj := &unstructured.Unstructured{Object: typeMetaAdder}
actual, err := c.dynamicClient.Resource(mapping.Resource).Namespace(ns).Create(obj)
if err != nil {
return err
}
*all = append(*all, cleanupData{output, mapping})
return nil
}
func (c *allClient) destroy(obj runtime.Object, mapping *meta.RESTMapping) error {
req, err := c.verb("DELETE", mapping.GroupVersionKind)
if err != nil {
return err
}
namespaced := mapping.Scope.Name() == meta.RESTScopeNameNamespace
name, err := meta.NewAccessor().Name(obj)
if err != nil {
return err
}
ns, err := meta.NewAccessor().Namespace(obj)
if err != nil {
return err
}
return req.NamespaceIfScoped(ns, namespaced).Resource(mapping.Resource.Resource).Name(name).Do().Error()
*all = append(*all, cleanupData{actual, mapping.Resource})
return nil
}
func (c *allClient) cleanup(all *[]cleanupData) error {
for i := len(*all) - 1; i >= 0; i-- { // delete in reverse order in case creation order mattered
obj := (*all)[i].obj
mapping := (*all)[i].mapping
gvr := (*all)[i].resource
if err := c.destroy(obj, mapping); err != nil {
if err := c.dynamicClient.Resource(gvr).Namespace(obj.GetNamespace()).Delete(obj.GetName(), nil); err != nil {
return err
}
}
@ -887,81 +854,6 @@ func (c *allClient) createPrerequisites(mapper meta.RESTMapper, ns string, prere
return nil
}
func newClient(config restclient.Config) (*allClient, error) {
config.ContentConfig.NegotiatedSerializer = legacyscheme.Codecs
config.ContentConfig.ContentType = "application/json"
config.Timeout = 30 * time.Second
config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(3, 10)
transport, err := restclient.TransportFor(&config)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: transport,
Timeout: config.Timeout,
}
backoff := &restclient.URLBackoff{
Backoff: flowcontrol.NewBackOff(1*time.Second, 10*time.Second),
}
return &allClient{
client: client,
config: &config,
backoff: backoff,
}, nil
}
// copied from restclient
func createSerializers(config restclient.ContentConfig) (*restclient.Serializers, error) {
mediaTypes := config.NegotiatedSerializer.SupportedMediaTypes()
contentType := config.ContentType
mediaType, _, err := mime.ParseMediaType(contentType)
if err != nil {
return nil, fmt.Errorf("the content type specified in the client configuration is not recognized: %v", err)
}
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType)
if !ok {
if len(contentType) != 0 || len(mediaTypes) == 0 {
return nil, fmt.Errorf("no serializers registered for %s", contentType)
}
info = mediaTypes[0]
}
internalGV := schema.GroupVersions{
{
Group: config.GroupVersion.Group,
Version: runtime.APIVersionInternal,
},
// always include the legacy group as a decoding target to handle non-error `Status` return types
{
Group: "",
Version: runtime.APIVersionInternal,
},
}
s := &restclient.Serializers{
Encoder: config.NegotiatedSerializer.EncoderForVersion(info.Serializer, *config.GroupVersion),
Decoder: config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV),
RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) {
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, contentType)
if !ok {
return nil, fmt.Errorf("serializer for %s not registered", contentType)
}
return config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil
},
}
if info.StreamSerializer != nil {
s.StreamingSerializer = info.StreamSerializer.Serializer
s.Framer = info.StreamSerializer.Framer
}
return s, nil
}
func getFromEtcd(keys clientv3.KV, path string) (*metaObject, error) {
response, err := keys.Get(context.Background(), path)
if err != nil {
@ -1045,7 +937,6 @@ func getResourcesToPersist(serverResources []*metav1.APIResourceList, isOAPI boo
}
}
gvr := resourceGV.WithResource(discoveryResource.Name)
legacyscheme.Scheme.New(gvk)
resourcesToPersist = append(resourcesToPersist, resourceToPersist{
gvk: gvk,