Merge pull request #66864 from deads2k/server-03-etcdstorage

Automatic merge from submit-queue (batch tested with PRs 66850, 66902, 66779, 66864, 66912). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

 update etcdstorage test to use discovery

1. update etcdstorage test to use discovery
2. switches to use the dynamic client
3. fixes the bug where the server wasn't a complete kube-apiserver


```release-note
NONE
```

/assign @enj @sttts
@kubernetes/sig-api-machinery-pr-reviews 
@apelisse a discovery based example.
 @yue9944882 you've expressed an interest.  The wiring for the server is really hard to use here.

I've also noticed that this doesn't build an equivalent kube-apiserver, but that is proving troublesome.  Will follow up with with.
pull/8/head
Kubernetes Submit Queue 2018-08-02 10:03:19 -07:00 committed by GitHub
commit 0c44148436
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 210 additions and 406 deletions

View File

@ -19,26 +19,23 @@ go_test(
deps = [
"//cmd/kube-apiserver/app:go_default_library",
"//cmd/kube-apiserver/app/options:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/master:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/restmapper:go_default_library",
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
"//test/integration:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",

6
test/integration/etcd/OWNERS Executable file
View File

@ -0,0 +1,6 @@
approvers:
- enj
reviewers:
- deads2k
- liggitt
- enj

View File

@ -21,13 +21,11 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"mime"
"net"
"net/http"
"os"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"
@ -35,22 +33,20 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/restmapper"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
kapi "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework"
@ -58,7 +54,6 @@ import (
_ "k8s.io/kubernetes/pkg/master" // TODO what else is needed
"github.com/coreos/etcd/clientv3"
"k8s.io/client-go/restmapper"
)
// Etcd data for all persisted objects.
@ -441,121 +436,36 @@ var etcdStorageData = map[schema.GroupVersionResource]struct {
expectedEtcdPath: "/registry/priorityclasses/pc2",
},
// --
// k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1
// depends on aggregator using the same ungrouped RESTOptionsGetter as the kube apiserver, not SimpleRestOptionsFactory in aggregator.go
gvr("apiregistration.k8s.io", "v1beta1", "apiservices"): {
stub: `{"metadata": {"name": "as1.foo.com"}, "spec": {"group": "foo.com", "version": "as1", "groupPriorityMinimum":100, "versionPriority":10}}`,
expectedEtcdPath: "/registry/apiregistration.k8s.io/apiservices/as1.foo.com",
},
// --
// k8s.io/kube-aggregator/pkg/apis/apiregistration/v1
// depends on aggregator using the same ungrouped RESTOptionsGetter as the kube apiserver, not SimpleRestOptionsFactory in aggregator.go
gvr("apiregistration.k8s.io", "v1", "apiservices"): {
stub: `{"metadata": {"name": "as2.foo.com"}, "spec": {"group": "foo.com", "version": "as2", "groupPriorityMinimum":100, "versionPriority":10}}`,
expectedEtcdPath: "/registry/apiregistration.k8s.io/apiservices/as2.foo.com",
expectedGVK: gvkP("apiregistration.k8s.io", "v1beta1", "APIService"),
},
// --
// k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1
gvr("apiextensions.k8s.io", "v1beta1", "customresourcedefinitions"): {
stub: `{"metadata": {"name": "openshiftwebconsoleconfigs.webconsole.operator.openshift.io"},"spec": {"scope": "Cluster","group": "webconsole.operator.openshift.io","version": "v1alpha1","names": {"kind": "OpenShiftWebConsoleConfig","plural": "openshiftwebconsoleconfigs","singular": "openshiftwebconsoleconfig"}}}`,
expectedEtcdPath: "/registry/apiextensions.k8s.io/customresourcedefinitions/openshiftwebconsoleconfigs.webconsole.operator.openshift.io",
},
// --
}
// Be very careful when whitelisting an object as ephemeral.
// Doing so removes the safety we gain from this test by skipping that object.
var ephemeralWhiteList = createEphemeralWhiteList(
// k8s.io/kubernetes/pkg/api/v1
gvk("", "v1", "Binding"), // annotation on pod, not stored in etcd
gvk("", "v1", "RangeAllocation"), // stored in various places in etcd but cannot be directly created
gvk("", "v1", "ComponentStatus"), // status info not stored in etcd
gvk("", "v1", "SerializedReference"), // used for serilization, not stored in etcd
gvk("", "v1", "PodStatusResult"), // wrapper object not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/authentication/v1beta1
gvk("authentication.k8s.io", "v1beta1", "TokenReview"), // not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/authentication/v1
gvk("authentication.k8s.io", "v1", "TokenReview"), // not stored in etcd
gvk("authentication.k8s.io", "v1", "TokenRequest"), // not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/authorization/v1beta1
// SRR objects that are not stored in etcd
gvk("authorization.k8s.io", "v1beta1", "SelfSubjectRulesReview"),
// SAR objects that are not stored in etcd
gvk("authorization.k8s.io", "v1beta1", "SelfSubjectAccessReview"),
gvk("authorization.k8s.io", "v1beta1", "LocalSubjectAccessReview"),
gvk("authorization.k8s.io", "v1beta1", "SubjectAccessReview"),
// --
// k8s.io/kubernetes/pkg/apis/authorization/v1
// SRR objects that are not stored in etcd
gvk("authorization.k8s.io", "v1", "SelfSubjectRulesReview"),
// SAR objects that are not stored in etcd
gvk("authorization.k8s.io", "v1", "SelfSubjectAccessReview"),
gvk("authorization.k8s.io", "v1", "LocalSubjectAccessReview"),
gvk("authorization.k8s.io", "v1", "SubjectAccessReview"),
// --
// k8s.io/kubernetes/pkg/apis/autoscaling/v1
gvk("autoscaling", "v1", "Scale"), // not stored in etcd, part of kapiv1.ReplicationController
// --
// k8s.io/kubernetes/pkg/apis/apps/v1beta1
gvk("apps", "v1beta1", "Scale"), // not stored in etcd, part of kapiv1.ReplicationController
gvk("apps", "v1beta1", "DeploymentRollback"), // used to rollback deployment, not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/apps/v1beta2
gvk("apps", "v1beta2", "Scale"), // not stored in etcd, part of kapiv1.ReplicationController
// --
// k8s.io/kubernetes/pkg/apis/batch/v1beta1
gvk("batch", "v1beta1", "JobTemplate"), // not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/batch/v2alpha1
gvk("batch", "v2alpha1", "JobTemplate"), // not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1
gvk("componentconfig", "v1alpha1", "KubeSchedulerConfiguration"), // not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/extensions/v1beta1
gvk("extensions", "v1beta1", "DeploymentRollback"), // used to rollback deployment, not stored in etcd
gvk("extensions", "v1beta1", "ReplicationControllerDummy"), // not stored in etcd
gvk("extensions", "v1beta1", "Scale"), // not stored in etcd, part of kapiv1.ReplicationController
// --
// k8s.io/kubernetes/pkg/apis/imagepolicy/v1alpha1
gvk("imagepolicy.k8s.io", "v1alpha1", "ImageReview"), // not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/policy/v1beta1
gvk("policy", "v1beta1", "Eviction"), // not stored in etcd, deals with evicting kapiv1.Pod
// --
// k8s.io/kubernetes/pkg/apis/admission/v1beta1
gvk("admission.k8s.io", "v1beta1", "AdmissionReview"), // not stored in etcd, call out to webhooks.
// --
)
// Only add kinds to this list when there is no way to create the object
var kindWhiteList = sets.NewString(
// k8s.io/kubernetes/pkg/api/v1
"DeleteOptions",
"ExportOptions",
"ListOptions",
"CreateOptions",
"UpdateOptions",
"NodeProxyOptions",
"PodAttachOptions",
"PodExecOptions",
"PodLogOptions",
"PodProxyOptions",
"ServiceProxyOptions",
"GetOptions",
"APIGroup",
"PodPortForwardOptions",
"APIVersions",
// --
// k8s.io/kubernetes/pkg/watch/versioned
"WatchEvent",
// --
// k8s.io/apimachinery/pkg/apis/meta/v1
"Status",
// --
)
// Only add kinds to this list when this a virtual resource with get and create verbs that doesn't actually
// store into it's kind. We've used this downstream for mappings before.
var kindWhiteList = sets.NewString()
// namespace used for all tests, do not change this
const testNamespace = "etcdstoragepathtestnamespace"
@ -568,67 +478,73 @@ func TestEtcdStoragePath(t *testing.T) {
certDir, _ := ioutil.TempDir("", "test-integration-etcd")
defer os.RemoveAll(certDir)
client, kvClient, mapper := startRealMasterOrDie(t, certDir)
clientConfig, kvClient := startRealMasterOrDie(t, certDir)
defer func() {
dumpEtcdKVOnFailure(t, kvClient)
}()
client := &allClient{dynamicClient: dynamic.NewForConfigOrDie(clientConfig)}
kubeClient := clientset.NewForConfigOrDie(clientConfig)
if _, err := kubeClient.CoreV1().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}); err != nil {
t.Fatal(err)
}
discoveryClient := cacheddiscovery.NewMemCacheClient(kubeClient.Discovery())
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
restMapper.Reset()
resourcesToPersist := []resourceToPersist{}
serverResources, err := kubeClient.Discovery().ServerResources()
if err != nil {
t.Fatal(err)
}
resourcesToPersist = append(resourcesToPersist, getResourcesToPersist(serverResources, false, t)...)
kindSeen := sets.NewString()
pathSeen := map[string][]schema.GroupVersionResource{}
etcdSeen := map[schema.GroupVersionResource]empty{}
ephemeralSeen := map[schema.GroupVersionKind]empty{}
cohabitatingResources := map[string]map[schema.GroupVersionKind]empty{}
for gvk, apiType := range legacyscheme.Scheme.AllKnownTypes() {
// we do not care about internal objects or lists // TODO make sure this is always true
if gvk.Version == runtime.APIVersionInternal || strings.HasSuffix(apiType.Name(), "List") {
continue
}
for _, resourceToPersist := range resourcesToPersist {
t.Run(resourceToPersist.gvr.String(), func(t *testing.T) {
gvk := resourceToPersist.gvk
gvResource := resourceToPersist.gvr
kind := gvk.Kind
kind := gvk.Kind
pkgPath := apiType.PkgPath()
if kindWhiteList.Has(kind) {
kindSeen.Insert(kind)
continue
}
_, isEphemeral := ephemeralWhiteList[gvk]
if isEphemeral {
ephemeralSeen[gvk] = empty{}
continue
}
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
t.Errorf("unexpected error getting mapping for %s from %s with GVK %s: %v", kind, pkgPath, gvk, err)
continue
}
etcdSeen[mapping.Resource] = empty{}
testData, hasTest := etcdStorageData[mapping.Resource]
if !hasTest {
t.Errorf("no test data for %s from %s. Please add a test for your new type to etcdStorageData.", kind, pkgPath)
continue
}
if len(testData.expectedEtcdPath) == 0 {
t.Errorf("empty test data for %s from %s", kind, pkgPath)
continue
}
shouldCreate := len(testData.stub) != 0 // try to create only if we have a stub
var input *metaObject
if shouldCreate {
if input, err = jsonToMetaObject([]byte(testData.stub)); err != nil || input.isEmpty() {
t.Errorf("invalid test data for %s from %s: %v", kind, pkgPath, err)
continue
mapping := &meta.RESTMapping{
Resource: resourceToPersist.gvr,
GroupVersionKind: resourceToPersist.gvk,
Scope: meta.RESTScopeRoot,
}
if resourceToPersist.namespaced {
mapping.Scope = meta.RESTScopeNamespace
}
if kindWhiteList.Has(kind) {
kindSeen.Insert(kind)
t.Skip("whitelisted")
}
etcdSeen[gvResource] = empty{}
testData, hasTest := etcdStorageData[gvResource]
if !hasTest {
t.Fatalf("no test data for %s. Please add a test for your new type to etcdStorageData.", gvResource)
}
if len(testData.expectedEtcdPath) == 0 {
t.Fatalf("empty test data for %s", gvResource)
}
shouldCreate := len(testData.stub) != 0 // try to create only if we have a stub
var input *metaObject
if shouldCreate {
if input, err = jsonToMetaObject([]byte(testData.stub)); err != nil || input.isEmpty() {
t.Fatalf("invalid test data for %s: %v", gvResource, err)
}
}
}
func() { // forces defer to run per iteration of the for loop
all := &[]cleanupData{}
defer func() {
if !t.Failed() { // do not cleanup if test has already failed since we may need things in the etcd dump
@ -638,54 +554,46 @@ func TestEtcdStoragePath(t *testing.T) {
}
}()
if err := client.createPrerequisites(mapper, testNamespace, testData.prerequisites, all); err != nil {
t.Errorf("failed to create prerequisites for %s from %s: %#v", kind, pkgPath, err)
return
if err := client.createPrerequisites(restMapper, testNamespace, testData.prerequisites, all); err != nil {
t.Fatalf("failed to create prerequisites for %s: %#v", gvResource, err)
}
if shouldCreate { // do not try to create items with no stub
if err := client.create(testData.stub, testNamespace, mapping, all); err != nil {
t.Errorf("failed to create stub for %s from %s: %#v", kind, pkgPath, err)
return
t.Fatalf("failed to create stub for %s: %#v", gvResource, err)
}
}
output, err := getFromEtcd(kvClient, testData.expectedEtcdPath)
if err != nil {
t.Errorf("failed to get from etcd for %s from %s: %#v", kind, pkgPath, err)
return
t.Fatalf("failed to get from etcd for %s: %#v", gvResource, err)
}
expectedGVK := gvk
if testData.expectedGVK != nil {
if gvk == *testData.expectedGVK {
t.Errorf("GVK override %s for %s from %s is unnecessary or something was changed incorrectly", testData.expectedGVK, kind, pkgPath)
t.Errorf("GVK override %s for %s is unnecessary or something was changed incorrectly", testData.expectedGVK, gvk)
}
expectedGVK = *testData.expectedGVK
}
actualGVK := output.getGVK()
if actualGVK != expectedGVK {
t.Errorf("GVK for %s from %s does not match, expected %s got %s", kind, pkgPath, expectedGVK, actualGVK)
t.Errorf("GVK for %s does not match, expected %s got %s", kind, expectedGVK, actualGVK)
}
if !apiequality.Semantic.DeepDerivative(input, output) {
t.Errorf("Test stub for %s from %s does not match: %s", kind, pkgPath, diff.ObjectGoPrintDiff(input, output))
t.Errorf("Test stub for %s does not match: %s", kind, diff.ObjectGoPrintDiff(input, output))
}
addGVKToEtcdBucket(cohabitatingResources, actualGVK, getEtcdBucket(testData.expectedEtcdPath))
pathSeen[testData.expectedEtcdPath] = append(pathSeen[testData.expectedEtcdPath], mapping.Resource)
}()
})
}
if inEtcdData, inEtcdSeen := diffMaps(etcdStorageData, etcdSeen); len(inEtcdData) != 0 || len(inEtcdSeen) != 0 {
t.Errorf("etcd data does not match the types we saw:\nin etcd data but not seen:\n%s\nseen but not in etcd data:\n%s", inEtcdData, inEtcdSeen)
}
if inEphemeralWhiteList, inEphemeralSeen := diffMaps(ephemeralWhiteList, ephemeralSeen); len(inEphemeralWhiteList) != 0 || len(inEphemeralSeen) != 0 {
t.Errorf("ephemeral whitelist does not match the types we saw:\nin ephemeral whitelist but not seen:\n%s\nseen but not in ephemeral whitelist:\n%s", inEphemeralWhiteList, inEphemeralSeen)
}
if inKindData, inKindSeen := diffMaps(kindWhiteList, kindSeen); len(inKindData) != 0 || len(inKindSeen) != 0 {
t.Errorf("kind whitelist data does not match the types we saw:\nin kind whitelist but not seen:\n%s\nseen but not in kind whitelist:\n%s", inKindData, inKindSeen)
}
@ -711,14 +619,37 @@ func TestEtcdStoragePath(t *testing.T) {
}
}
func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV, meta.RESTMapper) {
func startRealMasterOrDie(t *testing.T, certDir string) (*restclient.Config, clientv3.KV) {
_, defaultServiceClusterIPRange, err := net.ParseCIDR("10.0.0.0/24")
if err != nil {
t.Fatal(err)
}
kubeClientConfigValue := atomic.Value{}
storageConfigValue := atomic.Value{}
listener, _, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
kubeAPIServerOptions := options.NewServerRunOptions()
kubeAPIServerOptions.InsecureServing.BindPort = 0
kubeAPIServerOptions.SecureServing.Listener = listener
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()}
kubeAPIServerOptions.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON // force json we can easily interpret the result in etcd
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"}
kubeAPIServerOptions.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
completedOptions, err := app.Complete(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerOptions.APIEnablement.RuntimeConfig.Set("api/all=true")
kubeAPIServer, err := app.CreateServerChain(completedOptions, wait.NeverStop)
if err != nil {
t.Fatal(err)
}
kubeClientConfig := restclient.CopyConfig(kubeAPIServer.LoopbackClientConfig)
go func() {
// Catch panics that occur in this go routine so we get a comprehensible failure
@ -728,66 +659,17 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV
}
}()
listener, _, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
kubeAPIServerOptions := options.NewServerRunOptions()
kubeAPIServerOptions.SecureServing.Listener = listener
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()}
kubeAPIServerOptions.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON // TODO use protobuf?
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"}
kubeAPIServerOptions.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
completedOptions, err := app.Complete(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}
tunneler, proxyTransport, err := app.CreateNodeDialer(completedOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig.ExtraConfig.APIResourceConfigSource = &allResourceSource{} // force enable all resources
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), sharedInformers, versionedInformers, admissionPostStartHook)
if err != nil {
t.Fatal(err)
}
kubeClientConfigValue.Store(kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)
storageConfigValue.Store(kubeAPIServerOptions.Etcd.StorageConfig)
if err := kubeAPIServer.GenericAPIServer.PrepareRun().Run(wait.NeverStop); err != nil {
if err := kubeAPIServer.PrepareRun().Run(wait.NeverStop); err != nil {
t.Fatal(err)
}
}()
lastHealth := ""
if err := wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) {
obj := kubeClientConfigValue.Load()
if obj == nil {
return false, nil
}
kubeClientConfig := kubeClientConfigValue.Load().(*restclient.Config)
// make a copy so we can mutate it to set GroupVersion and NegotiatedSerializer
cfg := *kubeClientConfig
cfg.ContentConfig.GroupVersion = &schema.GroupVersion{}
cfg.ContentConfig.NegotiatedSerializer = legacyscheme.Codecs
privilegedClient, err := restclient.RESTClientFor(&cfg)
if err != nil {
// this happens because we race the API server start
t.Log(err)
return false, nil
}
// wait for the server to be healthy
result := privilegedClient.Get().AbsPath("/healthz").Do()
result := clientset.NewForConfigOrDie(kubeClientConfig).RESTClient().Get().AbsPath("/healthz").Do()
content, _ := result.Raw()
lastHealth = string(content)
if errResult := result.Error(); errResult != nil {
t.Log(errResult)
return false, nil
@ -796,32 +678,20 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV
result.StatusCode(&status)
return status == http.StatusOK, nil
}); err != nil {
t.Log(lastHealth)
t.Fatal(err)
}
kubeClientConfig := kubeClientConfigValue.Load().(*restclient.Config)
storageConfig := storageConfigValue.Load().(storagebackend.Config)
// this test makes lots of requests, don't be slow
kubeClientConfig.QPS = 99999
kubeClientConfig.Burst = 9999
kubeClient := clientset.NewForConfigOrDie(kubeClientConfig)
if _, err := kubeClient.CoreV1().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}); err != nil {
t.Fatal(err)
}
client, err := newClient(*kubeClientConfig)
kvClient, err := integration.GetEtcdKVClient(kubeAPIServerOptions.Etcd.StorageConfig)
if err != nil {
t.Fatal(err)
}
kvClient, err := integration.GetEtcdKVClient(storageConfig)
if err != nil {
t.Fatal(err)
}
discoveryClient := cacheddiscovery.NewMemCacheClient(kubeClient.Discovery())
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
restMapper.Reset()
return client, kvClient, restMapper
return kubeClientConfig, kvClient
}
func dumpEtcdKVOnFailure(t *testing.T, kvClient clientv3.KV) {
@ -888,33 +758,18 @@ type prerequisite struct {
type empty struct{}
type cleanupData struct {
obj runtime.Object
mapping *meta.RESTMapping
obj *unstructured.Unstructured
resource schema.GroupVersionResource
}
func gvr(g, v, r string) schema.GroupVersionResource {
return schema.GroupVersionResource{Group: g, Version: v, Resource: r}
}
func gvk(g, v, k string) schema.GroupVersionKind {
return schema.GroupVersionKind{Group: g, Version: v, Kind: k}
}
func gvkP(g, v, k string) *schema.GroupVersionKind {
return &schema.GroupVersionKind{Group: g, Version: v, Kind: k}
}
func createEphemeralWhiteList(gvks ...schema.GroupVersionKind) map[schema.GroupVersionKind]empty {
ephemeral := map[schema.GroupVersionKind]empty{}
for _, gvKind := range gvks {
if _, ok := ephemeral[gvKind]; ok {
panic("invalid ephemeral whitelist contains duplicate keys")
}
ephemeral[gvKind] = empty{}
}
return ephemeral
}
func jsonToMetaObject(stub []byte) (*metaObject, error) {
obj := &metaObject{}
if err := json.Unmarshal(stub, obj); err != nil {
@ -938,67 +793,38 @@ func keyStringer(i interface{}) string {
}
type allClient struct {
client *http.Client
config *restclient.Config
backoff restclient.BackoffManager
}
func (c *allClient) verb(verb string, gvk schema.GroupVersionKind) (*restclient.Request, error) {
apiPath := "/apis"
if gvk.Group == kapi.GroupName {
apiPath = "/api"
}
baseURL, versionedAPIPath, err := restclient.DefaultServerURL(c.config.Host, apiPath, gvk.GroupVersion(), true)
if err != nil {
return nil, err
}
contentConfig := c.config.ContentConfig
gv := gvk.GroupVersion()
contentConfig.GroupVersion = &gv
serializers, err := createSerializers(contentConfig)
if err != nil {
return nil, err
}
return restclient.NewRequest(c.client, verb, baseURL, versionedAPIPath, contentConfig, *serializers, c.backoff, c.config.RateLimiter, 0), nil
dynamicClient dynamic.Interface
}
func (c *allClient) create(stub, ns string, mapping *meta.RESTMapping, all *[]cleanupData) error {
req, err := c.verb("POST", mapping.GroupVersionKind)
// we don't require GVK on the data we provide, so we fill it in here. We could, but that seems extraneous.
typeMetaAdder := map[string]interface{}{}
err := json.Unmarshal([]byte(stub), &typeMetaAdder)
if err != nil {
return err
}
namespaced := mapping.Scope.Name() == meta.RESTScopeNameNamespace
output, err := req.NamespaceIfScoped(ns, namespaced).Resource(mapping.Resource.Resource).Body(strings.NewReader(stub)).Do().Get()
if err != nil {
return err
}
*all = append(*all, cleanupData{output, mapping})
return nil
}
typeMetaAdder["apiVersion"] = mapping.GroupVersionKind.GroupVersion().String()
typeMetaAdder["kind"] = mapping.GroupVersionKind.Kind
func (c *allClient) destroy(obj runtime.Object, mapping *meta.RESTMapping) error {
req, err := c.verb("DELETE", mapping.GroupVersionKind)
if mapping.Scope == meta.RESTScopeRoot {
ns = ""
}
obj := &unstructured.Unstructured{Object: typeMetaAdder}
actual, err := c.dynamicClient.Resource(mapping.Resource).Namespace(ns).Create(obj)
if err != nil {
return err
}
namespaced := mapping.Scope.Name() == meta.RESTScopeNameNamespace
name, err := meta.NewAccessor().Name(obj)
if err != nil {
return err
}
ns, err := meta.NewAccessor().Namespace(obj)
if err != nil {
return err
}
return req.NamespaceIfScoped(ns, namespaced).Resource(mapping.Resource.Resource).Name(name).Do().Error()
*all = append(*all, cleanupData{actual, mapping.Resource})
return nil
}
func (c *allClient) cleanup(all *[]cleanupData) error {
for i := len(*all) - 1; i >= 0; i-- { // delete in reverse order in case creation order mattered
obj := (*all)[i].obj
mapping := (*all)[i].mapping
gvr := (*all)[i].resource
if err := c.destroy(obj, mapping); err != nil {
if err := c.dynamicClient.Resource(gvr).Namespace(obj.GetNamespace()).Delete(obj.GetName(), nil); err != nil {
return err
}
}
@ -1022,81 +848,6 @@ func (c *allClient) createPrerequisites(mapper meta.RESTMapper, ns string, prere
return nil
}
func newClient(config restclient.Config) (*allClient, error) {
config.ContentConfig.NegotiatedSerializer = legacyscheme.Codecs
config.ContentConfig.ContentType = "application/json"
config.Timeout = 30 * time.Second
config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(3, 10)
transport, err := restclient.TransportFor(&config)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: transport,
Timeout: config.Timeout,
}
backoff := &restclient.URLBackoff{
Backoff: flowcontrol.NewBackOff(1*time.Second, 10*time.Second),
}
return &allClient{
client: client,
config: &config,
backoff: backoff,
}, nil
}
// copied from restclient
func createSerializers(config restclient.ContentConfig) (*restclient.Serializers, error) {
mediaTypes := config.NegotiatedSerializer.SupportedMediaTypes()
contentType := config.ContentType
mediaType, _, err := mime.ParseMediaType(contentType)
if err != nil {
return nil, fmt.Errorf("the content type specified in the client configuration is not recognized: %v", err)
}
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType)
if !ok {
if len(contentType) != 0 || len(mediaTypes) == 0 {
return nil, fmt.Errorf("no serializers registered for %s", contentType)
}
info = mediaTypes[0]
}
internalGV := schema.GroupVersions{
{
Group: config.GroupVersion.Group,
Version: runtime.APIVersionInternal,
},
// always include the legacy group as a decoding target to handle non-error `Status` return types
{
Group: "",
Version: runtime.APIVersionInternal,
},
}
s := &restclient.Serializers{
Encoder: config.NegotiatedSerializer.EncoderForVersion(info.Serializer, *config.GroupVersion),
Decoder: config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV),
RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) {
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, contentType)
if !ok {
return nil, fmt.Errorf("serializer for %s not registered", contentType)
}
return config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil
},
}
if info.StreamSerializer != nil {
s.StreamingSerializer = info.StreamSerializer.Serializer
s.Framer = info.StreamSerializer.Framer
}
return s, nil
}
func getFromEtcd(keys clientv3.KV, path string) (*metaObject, error) {
response, err := keys.Get(context.Background(), path)
if err != nil {
@ -1137,7 +888,57 @@ func diffMapKeys(a, b interface{}, stringer func(interface{}) string) []string {
return ret
}
type allResourceSource struct{}
type resourceToPersist struct {
gvk schema.GroupVersionKind
gvr schema.GroupVersionResource
golangType reflect.Type
namespaced bool
}
func (*allResourceSource) AnyVersionForGroupEnabled(group string) bool { return true }
func (*allResourceSource) VersionEnabled(version schema.GroupVersion) bool { return true }
func getResourcesToPersist(serverResources []*metav1.APIResourceList, isOAPI bool, t *testing.T) []resourceToPersist {
resourcesToPersist := []resourceToPersist{}
for _, discoveryGroup := range serverResources {
for _, discoveryResource := range discoveryGroup.APIResources {
// this is a subresource, skip it
if strings.Contains(discoveryResource.Name, "/") {
continue
}
hasCreate := false
hasGet := false
for _, verb := range discoveryResource.Verbs {
if string(verb) == "get" {
hasGet = true
}
if string(verb) == "create" {
hasCreate = true
}
}
if !(hasCreate && hasGet) {
continue
}
resourceGV, err := schema.ParseGroupVersion(discoveryGroup.GroupVersion)
if err != nil {
t.Fatal(err)
}
gvk := resourceGV.WithKind(discoveryResource.Kind)
if len(discoveryResource.Group) > 0 || len(discoveryResource.Version) > 0 {
gvk = schema.GroupVersionKind{
Group: discoveryResource.Group,
Version: discoveryResource.Version,
Kind: discoveryResource.Kind,
}
}
gvr := resourceGV.WithResource(discoveryResource.Name)
resourcesToPersist = append(resourcesToPersist, resourceToPersist{
gvk: gvk,
gvr: gvr,
namespaced: discoveryResource.Namespaced,
})
}
}
return resourcesToPersist
}