mirror of https://github.com/k3s-io/k3s
Merge pull request #73482 from liggitt/cross-group-watch
Always select the in-memory group/version as a target when decoding from storagepull/564/head
commit
fb96afb194
|
@ -9,6 +9,7 @@ load(
|
|||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"codec_test.go",
|
||||
"conversion_test.go",
|
||||
"converter_test.go",
|
||||
"embedded_test.go",
|
||||
|
|
|
@ -283,6 +283,7 @@ var _ GroupVersioner = multiGroupVersioner{}
|
|||
type multiGroupVersioner struct {
|
||||
target schema.GroupVersion
|
||||
acceptedGroupKinds []schema.GroupKind
|
||||
coerce bool
|
||||
}
|
||||
|
||||
// NewMultiGroupVersioner returns the provided group version for any kind that matches one of the provided group kinds.
|
||||
|
@ -294,6 +295,22 @@ func NewMultiGroupVersioner(gv schema.GroupVersion, groupKinds ...schema.GroupKi
|
|||
return multiGroupVersioner{target: gv, acceptedGroupKinds: groupKinds}
|
||||
}
|
||||
|
||||
// NewCoercingMultiGroupVersioner returns the provided group version for any incoming kind.
|
||||
// Incoming kinds that match the provided groupKinds are preferred.
|
||||
// Kind may be empty in the provided group kind, in which case any kind will match.
|
||||
// Examples:
|
||||
// gv=mygroup/__internal, groupKinds=mygroup/Foo, anothergroup/Bar
|
||||
// KindForGroupVersionKinds(yetanother/v1/Baz, anothergroup/v1/Bar) -> mygroup/__internal/Bar (matched preferred group/kind)
|
||||
//
|
||||
// gv=mygroup/__internal, groupKinds=mygroup, anothergroup
|
||||
// KindForGroupVersionKinds(yetanother/v1/Baz, anothergroup/v1/Bar) -> mygroup/__internal/Bar (matched preferred group)
|
||||
//
|
||||
// gv=mygroup/__internal, groupKinds=mygroup, anothergroup
|
||||
// KindForGroupVersionKinds(yetanother/v1/Baz, yetanother/v1/Bar) -> mygroup/__internal/Baz (no preferred group/kind match, uses first kind in list)
|
||||
func NewCoercingMultiGroupVersioner(gv schema.GroupVersion, groupKinds ...schema.GroupKind) GroupVersioner {
|
||||
return multiGroupVersioner{target: gv, acceptedGroupKinds: groupKinds, coerce: true}
|
||||
}
|
||||
|
||||
// KindForGroupVersionKinds returns the target group version if any kind matches any of the original group kinds. It will
|
||||
// use the originating kind where possible.
|
||||
func (v multiGroupVersioner) KindForGroupVersionKinds(kinds []schema.GroupVersionKind) (schema.GroupVersionKind, bool) {
|
||||
|
@ -308,5 +325,8 @@ func (v multiGroupVersioner) KindForGroupVersionKinds(kinds []schema.GroupVersio
|
|||
return v.target.WithKind(src.Kind), true
|
||||
}
|
||||
}
|
||||
if v.coerce && len(kinds) > 0 {
|
||||
return v.target.WithKind(kinds[0].Kind), true
|
||||
}
|
||||
return schema.GroupVersionKind{}, false
|
||||
}
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package runtime
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
func gv(group, version string) schema.GroupVersion {
|
||||
return schema.GroupVersion{Group: group, Version: version}
|
||||
}
|
||||
func gvk(group, version, kind string) schema.GroupVersionKind {
|
||||
return schema.GroupVersionKind{Group: group, Version: version, Kind: kind}
|
||||
}
|
||||
func gk(group, kind string) schema.GroupKind {
|
||||
return schema.GroupKind{Group: group, Kind: kind}
|
||||
}
|
||||
|
||||
func TestCoercingMultiGroupVersioner(t *testing.T) {
|
||||
testcases := []struct {
|
||||
name string
|
||||
target schema.GroupVersion
|
||||
preferredKinds []schema.GroupKind
|
||||
kinds []schema.GroupVersionKind
|
||||
expectKind schema.GroupVersionKind
|
||||
}{
|
||||
{
|
||||
name: "matched preferred group/kind",
|
||||
target: gv("mygroup", "__internal"),
|
||||
preferredKinds: []schema.GroupKind{gk("mygroup", "Foo"), gk("anothergroup", "Bar")},
|
||||
kinds: []schema.GroupVersionKind{gvk("yetanother", "v1", "Baz"), gvk("anothergroup", "v1", "Bar")},
|
||||
expectKind: gvk("mygroup", "__internal", "Bar"),
|
||||
},
|
||||
{
|
||||
name: "matched preferred group",
|
||||
target: gv("mygroup", "__internal"),
|
||||
preferredKinds: []schema.GroupKind{gk("mygroup", ""), gk("anothergroup", "")},
|
||||
kinds: []schema.GroupVersionKind{gvk("yetanother", "v1", "Baz"), gvk("anothergroup", "v1", "Bar")},
|
||||
expectKind: gvk("mygroup", "__internal", "Bar"),
|
||||
},
|
||||
{
|
||||
name: "no preferred group/kind match, uses first kind in list",
|
||||
target: gv("mygroup", "__internal"),
|
||||
preferredKinds: []schema.GroupKind{gk("mygroup", ""), gk("anothergroup", "")},
|
||||
kinds: []schema.GroupVersionKind{gvk("yetanother", "v1", "Baz"), gvk("yetanother", "v1", "Bar")},
|
||||
expectKind: gvk("mygroup", "__internal", "Baz"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
v := NewCoercingMultiGroupVersioner(tc.target, tc.preferredKinds...)
|
||||
kind, ok := v.KindForGroupVersionKinds(tc.kinds)
|
||||
if !ok {
|
||||
t.Error("got no kind")
|
||||
}
|
||||
if kind != tc.expectKind {
|
||||
t.Errorf("expected %#v, got %#v", tc.expectKind, kind)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -85,7 +85,7 @@ func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) {
|
|||
)
|
||||
decoder := opts.StorageSerializer.DecoderToVersion(
|
||||
recognizer.NewDecoder(decoders...),
|
||||
runtime.NewMultiGroupVersioner(
|
||||
runtime.NewCoercingMultiGroupVersioner(
|
||||
opts.MemoryVersion,
|
||||
schema.GroupKind{Group: opts.MemoryVersion.Group},
|
||||
schema.GroupKind{Group: opts.StorageVersion.Group},
|
||||
|
|
|
@ -6,6 +6,7 @@ go_test(
|
|||
name = "go_default_test",
|
||||
size = "large",
|
||||
srcs = [
|
||||
"etcd_cross_group_test.go",
|
||||
"etcd_storage_path_test.go",
|
||||
"main_test.go",
|
||||
],
|
||||
|
@ -24,6 +25,7 @@ go_test(
|
|||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
|
||||
"//test/integration/framework:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
|
||||
|
|
|
@ -0,0 +1,184 @@
|
|||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
||||
)
|
||||
|
||||
// TestCrossGroupStorage tests to make sure that all objects stored in an expected location in etcd can be converted/read.
|
||||
func TestCrossGroupStorage(t *testing.T) {
|
||||
master := StartRealMasterOrDie(t, func(opts *options.ServerRunOptions) {
|
||||
// force enable all resources so we can check storage.
|
||||
// TODO: drop these once we stop allowing them to be served.
|
||||
opts.APIEnablement.RuntimeConfig["extensions/v1beta1/deployments"] = "true"
|
||||
opts.APIEnablement.RuntimeConfig["extensions/v1beta1/daemonsets"] = "true"
|
||||
opts.APIEnablement.RuntimeConfig["extensions/v1beta1/replicasets"] = "true"
|
||||
opts.APIEnablement.RuntimeConfig["extensions/v1beta1/podsecuritypolicies"] = "true"
|
||||
opts.APIEnablement.RuntimeConfig["extensions/v1beta1/networkpolicies"] = "true"
|
||||
})
|
||||
defer master.Cleanup()
|
||||
|
||||
etcdStorageData := GetEtcdStorageData()
|
||||
|
||||
crossGroupResources := map[schema.GroupVersionKind][]Resource{}
|
||||
|
||||
master.Client.CoreV1().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}})
|
||||
|
||||
// Group by persisted GVK
|
||||
for _, resourceToPersist := range master.Resources {
|
||||
gvk := resourceToPersist.Mapping.GroupVersionKind
|
||||
data, exists := etcdStorageData[resourceToPersist.Mapping.Resource]
|
||||
if !exists {
|
||||
continue
|
||||
}
|
||||
storageGVK := gvk
|
||||
if data.ExpectedGVK != nil {
|
||||
storageGVK = *data.ExpectedGVK
|
||||
}
|
||||
crossGroupResources[storageGVK] = append(crossGroupResources[storageGVK], resourceToPersist)
|
||||
}
|
||||
|
||||
// Clear any without cross-group sources
|
||||
for gvk, resources := range crossGroupResources {
|
||||
groups := sets.NewString()
|
||||
for _, resource := range resources {
|
||||
groups.Insert(resource.Mapping.GroupVersionKind.Group)
|
||||
}
|
||||
if len(groups) < 2 {
|
||||
delete(crossGroupResources, gvk)
|
||||
}
|
||||
}
|
||||
|
||||
if len(crossGroupResources) == 0 {
|
||||
// Sanity check
|
||||
t.Fatal("no cross-group resources found")
|
||||
}
|
||||
|
||||
// Test all potential cross-group sources can be watched and fetched from all other sources
|
||||
for gvk, resources := range crossGroupResources {
|
||||
t.Run(gvk.String(), func(t *testing.T) {
|
||||
// use the first one to create the initial object
|
||||
resource := resources[0]
|
||||
|
||||
// compute namespace
|
||||
ns := ""
|
||||
if resource.Mapping.Scope.Name() == meta.RESTScopeNameNamespace {
|
||||
ns = testNamespace
|
||||
}
|
||||
|
||||
data := etcdStorageData[resource.Mapping.Resource]
|
||||
// create object
|
||||
resourceClient, obj, err := JSONToUnstructured(data.Stub, ns, resource.Mapping, master.Dynamic)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
actual, err := resourceClient.Create(obj, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
name := actual.GetName()
|
||||
|
||||
// Set up clients, versioned data, and watches for all versions
|
||||
var (
|
||||
clients = map[schema.GroupVersionResource]dynamic.ResourceInterface{}
|
||||
versionedData = map[schema.GroupVersionResource]*unstructured.Unstructured{}
|
||||
watches = map[schema.GroupVersionResource]watch.Interface{}
|
||||
)
|
||||
for _, resource := range resources {
|
||||
clients[resource.Mapping.Resource] = master.Dynamic.Resource(resource.Mapping.Resource).Namespace(ns)
|
||||
versionedData[resource.Mapping.Resource], err = clients[resource.Mapping.Resource].Get(name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("error finding resource via %s: %v", resource.Mapping.Resource.GroupVersion().String(), err)
|
||||
}
|
||||
watches[resource.Mapping.Resource], err = clients[resource.Mapping.Resource].Watch(metav1.ListOptions{ResourceVersion: actual.GetResourceVersion()})
|
||||
if err != nil {
|
||||
t.Fatalf("error opening watch via %s: %v", resource.Mapping.Resource.GroupVersion().String(), err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, resource := range resources {
|
||||
// clear out the things cleared in etcd
|
||||
versioned := versionedData[resource.Mapping.Resource]
|
||||
versioned.SetResourceVersion("")
|
||||
versioned.SetSelfLink("")
|
||||
versionedJSON, err := versioned.MarshalJSON()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Update in etcd
|
||||
if _, err := master.KV.Put(context.Background(), data.ExpectedEtcdPath, string(versionedJSON)); err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
t.Logf("wrote %s to etcd", resource.Mapping.Resource.GroupVersion().String())
|
||||
|
||||
// Ensure everyone gets a watch event with the right version
|
||||
for watchResource, watcher := range watches {
|
||||
select {
|
||||
case event, ok := <-watcher.ResultChan():
|
||||
if !ok {
|
||||
t.Fatalf("watch of %s closed in response to persisting %s", watchResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String())
|
||||
}
|
||||
if event.Type != watch.Modified {
|
||||
eventJSON, _ := json.Marshal(event)
|
||||
t.Errorf("unexpected watch event sent to watch of %s in response to persisting %s: %s", watchResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String(), string(eventJSON))
|
||||
continue
|
||||
}
|
||||
if event.Object.GetObjectKind().GroupVersionKind().GroupVersion() != watchResource.GroupVersion() {
|
||||
t.Errorf("unexpected group version object sent to watch of %s in response to persisting %s: %#v", watchResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String(), event.Object)
|
||||
continue
|
||||
}
|
||||
t.Logf(" received event for %s", watchResource.GroupVersion().String())
|
||||
case <-time.After(30 * time.Second):
|
||||
t.Errorf("timed out waiting for watch event for %s in response to persisting %s", watchResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String())
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure everyone can do a direct get and gets the right version
|
||||
for clientResource, client := range clients {
|
||||
obj, err := client.Get(name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("error looking up %s after persisting %s", clientResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String())
|
||||
continue
|
||||
}
|
||||
if obj.GetObjectKind().GroupVersionKind().GroupVersion() != clientResource.GroupVersion() {
|
||||
t.Errorf("unexpected group version retrieved from %s after persisting %s: %#v", clientResource.GroupVersion().String(), resource.Mapping.Resource.GroupVersion().String(), obj)
|
||||
continue
|
||||
}
|
||||
t.Logf(" fetched object for %s", clientResource.GroupVersion().String())
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue