diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/BUILD b/staging/src/k8s.io/apimachinery/pkg/runtime/BUILD index 6b52ceff0f..1b3350b7f2 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/BUILD +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/BUILD @@ -9,6 +9,7 @@ load( go_test( name = "go_default_test", srcs = [ + "codec_test.go", "conversion_test.go", "converter_test.go", "embedded_test.go", diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/codec.go b/staging/src/k8s.io/apimachinery/pkg/runtime/codec.go index 6b859b2889..284e32bc3c 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/codec.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/codec.go @@ -283,6 +283,7 @@ var _ GroupVersioner = multiGroupVersioner{} type multiGroupVersioner struct { target schema.GroupVersion acceptedGroupKinds []schema.GroupKind + coerce bool } // NewMultiGroupVersioner returns the provided group version for any kind that matches one of the provided group kinds. @@ -294,6 +295,22 @@ func NewMultiGroupVersioner(gv schema.GroupVersion, groupKinds ...schema.GroupKi return multiGroupVersioner{target: gv, acceptedGroupKinds: groupKinds} } +// NewCoercingMultiGroupVersioner returns the provided group version for any incoming kind. +// Incoming kinds that match the provided groupKinds are preferred. +// Kind may be empty in the provided group kind, in which case any kind will match. +// Examples: +// gv=mygroup/__internal, groupKinds=mygroup/Foo, anothergroup/Bar +// KindForGroupVersionKinds(yetanother/v1/Baz, anothergroup/v1/Bar) -> mygroup/__internal/Bar (matched preferred group/kind) +// +// gv=mygroup/__internal, groupKinds=mygroup, anothergroup +// KindForGroupVersionKinds(yetanother/v1/Baz, anothergroup/v1/Bar) -> mygroup/__internal/Bar (matched preferred group) +// +// gv=mygroup/__internal, groupKinds=mygroup, anothergroup +// KindForGroupVersionKinds(yetanother/v1/Baz, yetanother/v1/Bar) -> mygroup/__internal/Baz (no preferred group/kind match, uses first kind in list) +func NewCoercingMultiGroupVersioner(gv schema.GroupVersion, groupKinds ...schema.GroupKind) GroupVersioner { + return multiGroupVersioner{target: gv, acceptedGroupKinds: groupKinds, coerce: true} +} + // KindForGroupVersionKinds returns the target group version if any kind matches any of the original group kinds. It will // use the originating kind where possible. func (v multiGroupVersioner) KindForGroupVersionKinds(kinds []schema.GroupVersionKind) (schema.GroupVersionKind, bool) { @@ -308,5 +325,8 @@ func (v multiGroupVersioner) KindForGroupVersionKinds(kinds []schema.GroupVersio return v.target.WithKind(src.Kind), true } } + if v.coerce && len(kinds) > 0 { + return v.target.WithKind(kinds[0].Kind), true + } return schema.GroupVersionKind{}, false } diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/codec_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/codec_test.go new file mode 100644 index 0000000000..be169f781a --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/codec_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "testing" + + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func gv(group, version string) schema.GroupVersion { + return schema.GroupVersion{Group: group, Version: version} +} +func gvk(group, version, kind string) schema.GroupVersionKind { + return schema.GroupVersionKind{Group: group, Version: version, Kind: kind} +} +func gk(group, kind string) schema.GroupKind { + return schema.GroupKind{Group: group, Kind: kind} +} + +func TestCoercingMultiGroupVersioner(t *testing.T) { + testcases := []struct { + name string + target schema.GroupVersion + preferredKinds []schema.GroupKind + kinds []schema.GroupVersionKind + expectKind schema.GroupVersionKind + }{ + { + name: "matched preferred group/kind", + target: gv("mygroup", "__internal"), + preferredKinds: []schema.GroupKind{gk("mygroup", "Foo"), gk("anothergroup", "Bar")}, + kinds: []schema.GroupVersionKind{gvk("yetanother", "v1", "Baz"), gvk("anothergroup", "v1", "Bar")}, + expectKind: gvk("mygroup", "__internal", "Bar"), + }, + { + name: "matched preferred group", + target: gv("mygroup", "__internal"), + preferredKinds: []schema.GroupKind{gk("mygroup", ""), gk("anothergroup", "")}, + kinds: []schema.GroupVersionKind{gvk("yetanother", "v1", "Baz"), gvk("anothergroup", "v1", "Bar")}, + expectKind: gvk("mygroup", "__internal", "Bar"), + }, + { + name: "no preferred group/kind match, uses first kind in list", + target: gv("mygroup", "__internal"), + preferredKinds: []schema.GroupKind{gk("mygroup", ""), gk("anothergroup", "")}, + kinds: []schema.GroupVersionKind{gvk("yetanother", "v1", "Baz"), gvk("yetanother", "v1", "Bar")}, + expectKind: gvk("mygroup", "__internal", "Baz"), + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + v := NewCoercingMultiGroupVersioner(tc.target, tc.preferredKinds...) + kind, ok := v.KindForGroupVersionKinds(tc.kinds) + if !ok { + t.Error("got no kind") + } + if kind != tc.expectKind { + t.Errorf("expected %#v, got %#v", tc.expectKind, kind) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_codec.go b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_codec.go index 113ac573cf..e2f91bf13d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/storage/storage_codec.go +++ b/staging/src/k8s.io/apiserver/pkg/server/storage/storage_codec.go @@ -85,7 +85,7 @@ func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) { ) decoder := opts.StorageSerializer.DecoderToVersion( recognizer.NewDecoder(decoders...), - runtime.NewMultiGroupVersioner( + runtime.NewCoercingMultiGroupVersioner( opts.MemoryVersion, schema.GroupKind{Group: opts.MemoryVersion.Group}, schema.GroupKind{Group: opts.StorageVersion.Group}, diff --git a/test/integration/etcd/BUILD b/test/integration/etcd/BUILD index 886566f7ff..700a438d66 100644 --- a/test/integration/etcd/BUILD +++ b/test/integration/etcd/BUILD @@ -6,6 +6,7 @@ go_test( name = "go_default_test", size = "large", srcs = [ + "etcd_cross_group_test.go", "etcd_storage_path_test.go", "main_test.go", ], @@ -24,6 +25,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", "//test/integration/framework:go_default_library", "//vendor/github.com/coreos/etcd/clientv3:go_default_library", diff --git a/test/integration/etcd/etcd_cross_group_test.go b/test/integration/etcd/etcd_cross_group_test.go new file mode 100644 index 0000000000..d801aea7c7 --- /dev/null +++ b/test/integration/etcd/etcd_cross_group_test.go @@ -0,0 +1,184 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "context" + "encoding/json" + "testing" + "time" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" +) + +// TestCrossGroupStorage tests to make sure that all objects stored in an expected location in etcd can be converted/read. +func TestCrossGroupStorage(t *testing.T) { + master := StartRealMasterOrDie(t, func(opts *options.ServerRunOptions) { + // force enable all resources so we can check storage. + // TODO: drop these once we stop allowing them to be served. + opts.APIEnablement.RuntimeConfig["extensions/v1beta1/deployments"] = "true" + opts.APIEnablement.RuntimeConfig["extensions/v1beta1/daemonsets"] = "true" + opts.APIEnablement.RuntimeConfig["extensions/v1beta1/replicasets"] = "true" + opts.APIEnablement.RuntimeConfig["extensions/v1beta1/podsecuritypolicies"] = "true" + opts.APIEnablement.RuntimeConfig["extensions/v1beta1/networkpolicies"] = "true" + }) + defer master.Cleanup() + + etcdStorageData := GetEtcdStorageData() + + crossGroupResources := map[schema.GroupVersionKind][]Resource{} + + master.Client.CoreV1().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}) + + // Group by persisted GVK + for _, resourceToPersist := range master.Resources { + gvk := resourceToPersist.Mapping.GroupVersionKind + data, exists := etcdStorageData[resourceToPersist.Mapping.Resource] + if !exists { + continue + } + storageGVK := gvk + if data.ExpectedGVK != nil { + storageGVK = *data.ExpectedGVK + } + crossGroupResources[storageGVK] = append(crossGroupResources[storageGVK], resourceToPersist) + } + + // Clear any without cross-group sources + for gvk, resources := range crossGroupResources { + groups := sets.NewString() + for _, resource := range resources { + groups.Insert(resource.Mapping.GroupVersionKind.Group) + } + if len(groups) < 2 { + delete(crossGroupResources, gvk) + } + } + + if len(crossGroupResources) == 0 { + // Sanity check + t.Fatal("no cross-group resources found") + } + + // Test all potential cross-group sources can be watched and fetched from all other sources + for gvk, resources := range crossGroupResources { + t.Run(gvk.String(), func(t *testing.T) { + // use the first one to create the initial object + resource := resources[0] + + // compute namespace + ns := "" + if resource.Mapping.Scope.Name() == meta.RESTScopeNameNamespace { + ns = testNamespace + } + + data := etcdStorageData[resource.Mapping.Resource] + // create object + resourceClient, obj, err := JSONToUnstructured(data.Stub, ns, resource.Mapping, master.Dynamic) + if err != nil { + t.Fatal(err) + } + actual, err := resourceClient.Create(obj, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + name := actual.GetName() + + // Set up clients, versioned data, and watches for all versions + var ( + clients = map[schema.GroupVersionResource]dynamic.ResourceInterface{} + versionedData = map[schema.GroupVersionResource]*unstructured.Unstructured{} + watches = map[schema.GroupVersionResource]watch.Interface{} + ) + for _, resource := range resources { + clients[resource.Mapping.Resource] = master.Dynamic.Resource(resource.Mapping.Resource).Namespace(ns) + versionedData[resource.Mapping.Resource], err = clients[resource.Mapping.Resource].Get(name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("error finding resource via %s: %v", resource.Mapping.Resource.GroupVersion().String(), err) + } + watches[resource.Mapping.Resource], err = clients[resource.Mapping.Resource].Watch(metav1.ListOptions{ResourceVersion: actual.GetResourceVersion()}) + if err != nil { + t.Fatalf("error opening watch via %s: %v", resource.Mapping.Resource.GroupVersion().String(), err) + } + } + + for _, resource := range resources { + // clear out the things cleared in etcd + versioned := versionedData[resource.Mapping.Resource] + versioned.SetResourceVersion("") + versioned.SetSelfLink("") + versionedJSON, err := versioned.MarshalJSON() + if err != nil { + t.Error(err) + continue + } + + // Update in etcd + if _, err := master.KV.Put(context.Background(), data.ExpectedEtcdPath, string(versionedJSON)); err != nil { + t.Error(err) + continue + } + t.Logf("wrote %s to etcd", resource.Mapping.Resource.GroupVersion().String()) + + // Ensure everyone gets a watch event with the right version + for watchResource, watcher := range watches { + select { + case event, ok := <-watcher.ResultChan(): + if !ok { + t.Fatalf("watch of %s closed in response to persisting %s", watchResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String()) + } + if event.Type != watch.Modified { + eventJSON, _ := json.Marshal(event) + t.Errorf("unexpected watch event sent to watch of %s in response to persisting %s: %s", watchResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String(), string(eventJSON)) + continue + } + if event.Object.GetObjectKind().GroupVersionKind().GroupVersion() != watchResource.GroupVersion() { + t.Errorf("unexpected group version object sent to watch of %s in response to persisting %s: %#v", watchResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String(), event.Object) + continue + } + t.Logf(" received event for %s", watchResource.GroupVersion().String()) + case <-time.After(30 * time.Second): + t.Errorf("timed out waiting for watch event for %s in response to persisting %s", watchResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String()) + continue + } + } + + // Ensure everyone can do a direct get and gets the right version + for clientResource, client := range clients { + obj, err := client.Get(name, metav1.GetOptions{}) + if err != nil { + t.Errorf("error looking up %s after persisting %s", clientResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String()) + continue + } + if obj.GetObjectKind().GroupVersionKind().GroupVersion() != clientResource.GroupVersion() { + t.Errorf("unexpected group version retrieved from %s after persisting %s: %#v", clientResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String(), obj) + continue + } + t.Logf(" fetched object for %s", clientResource.GroupVersion().String()) + } + } + }) + } +}