diff --git a/cmd/kube-aggregator/pkg/cmd/server/start.go b/cmd/kube-aggregator/pkg/cmd/server/start.go index e6d933056f..3a8cc74462 100644 --- a/cmd/kube-aggregator/pkg/cmd/server/start.go +++ b/cmd/kube-aggregator/pkg/cmd/server/start.go @@ -63,7 +63,7 @@ type DiscoveryServerOptions struct { // NewCommandStartMaster provides a CLI handler for 'start master' command func NewCommandStartDiscoveryServer(out, err io.Writer) *cobra.Command { o := &DiscoveryServerOptions{ - Etcd: genericoptions.NewEtcdOptions(), + Etcd: genericoptions.NewEtcdOptions(api.Scheme), SecureServing: genericoptions.NewSecureServingOptions(), Authentication: genericoptions.NewDelegatingAuthenticationOptions(), Authorization: genericoptions.NewDelegatingAuthorizationOptions(), diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index 78cca364a6..66072c25fe 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -65,7 +65,7 @@ type ServerRunOptions struct { func NewServerRunOptions() *ServerRunOptions { s := ServerRunOptions{ GenericServerRunOptions: genericoptions.NewServerRunOptions(), - Etcd: genericoptions.NewEtcdOptions(), + Etcd: genericoptions.NewEtcdOptions(api.Scheme), SecureServing: genericoptions.NewSecureServingOptions(), InsecureServing: genericoptions.NewInsecureServingOptions(), Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(), diff --git a/examples/apiserver/apiserver.go b/examples/apiserver/apiserver.go index 91f2519a95..012898afca 100644 --- a/examples/apiserver/apiserver.go +++ b/examples/apiserver/apiserver.go @@ -49,6 +49,7 @@ func newStorageFactory() genericapiserver.StorageFactory { config := storagebackend.Config{ Prefix: genericoptions.DefaultEtcdPathPrefix, ServerList: []string{"http://127.0.0.1:2379"}, + Copier: api.Scheme, } storageFactory := genericapiserver.NewDefaultStorageFactory(config, "application/json", api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig()) @@ -67,7 +68,7 @@ type ServerRunOptions struct { func NewServerRunOptions() *ServerRunOptions { s := ServerRunOptions{ GenericServerRunOptions: genericoptions.NewServerRunOptions(), - Etcd: genericoptions.NewEtcdOptions(), + Etcd: genericoptions.NewEtcdOptions(api.Scheme), SecureServing: genericoptions.NewSecureServingOptions(), InsecureServing: genericoptions.NewInsecureServingOptions(), Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(), diff --git a/federation/cmd/federation-apiserver/app/options/options.go b/federation/cmd/federation-apiserver/app/options/options.go index b719b1a842..bb6139cf7d 100644 --- a/federation/cmd/federation-apiserver/app/options/options.go +++ b/federation/cmd/federation-apiserver/app/options/options.go @@ -21,6 +21,7 @@ import ( "time" genericoptions "k8s.io/apiserver/pkg/server/options" + "k8s.io/kubernetes/pkg/api" kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options" // add the kubernetes feature gates @@ -47,7 +48,7 @@ type ServerRunOptions struct { func NewServerRunOptions() *ServerRunOptions { s := ServerRunOptions{ GenericServerRunOptions: genericoptions.NewServerRunOptions(), - Etcd: genericoptions.NewEtcdOptions(), + Etcd: genericoptions.NewEtcdOptions(api.Scheme), SecureServing: genericoptions.NewSecureServingOptions(), InsecureServing: genericoptions.NewInsecureServingOptions(), Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(), diff --git a/pkg/genericapiserver/registry/generic/registry/store_test.go b/pkg/genericapiserver/registry/generic/registry/store_test.go index cba3101f9b..edddc483df 100644 --- a/pkg/genericapiserver/registry/generic/registry/store_test.go +++ b/pkg/genericapiserver/registry/generic/registry/store_test.go @@ -1235,7 +1235,7 @@ func TestStoreWatch(t *testing.T) { func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (factory.DestroyFunc, *Store) { podPrefix := "/pods" - server, sc := etcdtesting.NewUnsecuredEtcd3TestClientServer(t) + server, sc := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, api.Scheme) strategy := &testRESTStrategy{api.Scheme, names.SimpleNameGenerator, true, false, true} sc.Codec = testapi.Default.StorageCodec() diff --git a/pkg/genericapiserver/server/genericapiserver_test.go b/pkg/genericapiserver/server/genericapiserver_test.go index 5f951b42d7..d3a1d34302 100644 --- a/pkg/genericapiserver/server/genericapiserver_test.go +++ b/pkg/genericapiserver/server/genericapiserver_test.go @@ -75,7 +75,7 @@ func init() { // setUp is a convience function for setting up for (most) tests. func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertions) { - etcdServer, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t) + etcdServer, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme) config := NewConfig() config.PublicAddress = net.ParseIP("192.168.10.4") diff --git a/pkg/genericapiserver/server/storage_factory_test.go b/pkg/genericapiserver/server/storage_factory_test.go index eb3d572814..d4fa24e322 100644 --- a/pkg/genericapiserver/server/storage_factory_test.go +++ b/pkg/genericapiserver/server/storage_factory_test.go @@ -113,6 +113,7 @@ func TestUpdateEtcdOverrides(t *testing.T) { defaultConfig := storagebackend.Config{ Prefix: options.DefaultEtcdPathPrefix, ServerList: defaultEtcdLocation, + Copier: scheme, } storageFactory := NewDefaultStorageFactory(defaultConfig, "", api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig()) storageFactory.SetEtcdLocation(test.resource, test.servers) diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 12f443ad54..af04f2ac13 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -59,7 +59,7 @@ import ( // setUp is a convience function for setting up for (most) tests. func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) { - server, storageConfig := etcdtesting.NewUnsecuredEtcd3TestClientServer(t) + server, storageConfig := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, api.Scheme) config := &Config{ GenericConfig: genericapiserver.NewConfig(), diff --git a/pkg/registry/registrytest/etcd.go b/pkg/registry/registrytest/etcd.go index d76f89736d..15c5f3f8f2 100644 --- a/pkg/registry/registrytest/etcd.go +++ b/pkg/registry/registrytest/etcd.go @@ -38,7 +38,7 @@ import ( ) func NewEtcdStorage(t *testing.T, group string) (*storagebackend.Config, *etcdtesting.EtcdTestServer) { - server, config := etcdtesting.NewUnsecuredEtcd3TestClientServer(t) + server, config := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, api.Scheme) config.Codec = testapi.Groups[group].StorageCodec() return config, server } diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index 6953a8d66e..08944b0c0a 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -36,19 +36,18 @@ import ( "k8s.io/apiserver/pkg/storage/etcd/metrics" utilcache "k8s.io/apiserver/pkg/util/cache" utiltrace "k8s.io/apiserver/pkg/util/trace" - "k8s.io/kubernetes/pkg/api" etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" ) // Creates a new storage interface from the client // TODO: deprecate in favor of storage.Config abstraction over time -func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface { +func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, copier runtime.ObjectCopier) storage.Interface { return &etcdHelper{ etcdMembersAPI: etcd.NewMembersAPI(client), etcdKeysAPI: etcd.NewKeysAPI(client), codec: codec, versioner: APIObjectVersioner{}, - copier: api.Scheme, + copier: copier, pathPrefix: path.Join("/", prefix), quorum: quorum, cache: utilcache.NewCache(cacheSize), diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index f4dbcf9370..e2793f0ab3 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -25,26 +25,30 @@ import ( etcd "github.com/coreos/etcd/client" "golang.org/x/net/context" + apitesting "k8s.io/apimachinery/pkg/api/testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apiserver/pkg/apis/example" + examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/testapi" - apitesting "k8s.io/kubernetes/pkg/api/testing" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" storagetesting "k8s.io/kubernetes/pkg/storage/testing" + storagetests "k8s.io/kubernetes/pkg/storage/tests" ) -func testScheme(t *testing.T) (*runtime.Scheme, runtime.Codec) { +func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) { scheme := runtime.NewScheme() scheme.Log(t) - scheme.AddKnownTypes(api.Registry.GroupOrDie(api.GroupName).GroupVersion, &storagetesting.TestResource{}) - scheme.AddKnownTypes(testapi.Default.InternalGroupVersion(), &storagetesting.TestResource{}) + scheme.AddKnownTypes(schema.GroupVersion{Version: runtime.APIVersionInternal}, &storagetesting.TestResource{}) + scheme.AddKnownTypes(schema.GroupVersion{Version: runtime.APIVersionInternal}, &storagetesting.TestResource{}) + example.AddToScheme(scheme) + examplev1.AddToScheme(scheme) if err := scheme.AddConversionFuncs( func(in *storagetesting.TestResource, out *storagetesting.TestResource, s conversion.Scope) error { *out = *in @@ -57,17 +61,17 @@ func testScheme(t *testing.T) (*runtime.Scheme, runtime.Codec) { ); err != nil { panic(err) } - codec := serializer.NewCodecFactory(scheme).LegacyCodec(api.Registry.GroupOrDie(api.GroupName).GroupVersion) - return scheme, codec + codecs := serializer.NewCodecFactory(scheme) + return scheme, codecs } -func newEtcdHelper(client etcd.Client, codec runtime.Codec, prefix string) etcdHelper { - return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize).(*etcdHelper) +func newEtcdHelper(client etcd.Client, scheme *runtime.Scheme, codec runtime.Codec, prefix string) etcdHelper { + return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, scheme).(*etcdHelper) } -// Returns an encoded version of api.Pod with the given name. -func getEncodedPod(name string) string { - pod, _ := runtime.Encode(testapi.Default.Codec(), &api.Pod{ +// Returns an encoded version of example.Pod with the given name. +func getEncodedPod(name string, codec runtime.Codec) string { + pod, _ := runtime.Encode(codec, &examplev1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: name}, }) return string(pod) @@ -81,9 +85,9 @@ func createObj(t *testing.T, helper etcdHelper, name string, obj, out runtime.Ob return err } -func createPodList(t *testing.T, helper etcdHelper, list *api.PodList) error { +func createPodList(t *testing.T, helper etcdHelper, list *example.PodList) error { for i := range list.Items { - returnedObj := &api.Pod{} + returnedObj := &example.Pod{} err := createObj(t, helper, list.Items[i].Name, &list.Items[i], returnedObj, 0) if err != nil { return err @@ -94,29 +98,31 @@ func createPodList(t *testing.T, helper etcdHelper, list *api.PodList) error { } func TestList(t *testing.T) { + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) - helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) + helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) - list := api.PodList{ - Items: []api.Pod{ + list := example.PodList{ + Items: []example.Pod{ { ObjectMeta: metav1.ObjectMeta{Name: "bar"}, - Spec: apitesting.DeepEqualSafePodSpec(), + Spec: storagetests.DeepEqualSafePodSpec(), }, { ObjectMeta: metav1.ObjectMeta{Name: "baz"}, - Spec: apitesting.DeepEqualSafePodSpec(), + Spec: storagetests.DeepEqualSafePodSpec(), }, { ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Spec: apitesting.DeepEqualSafePodSpec(), + Spec: storagetests.DeepEqualSafePodSpec(), }, }, } createPodList(t, helper, &list) - var got api.PodList + var got example.PodList // TODO: a sorted filter function could be applied such implied // ordering on the returned list doesn't matter. err := helper.List(context.TODO(), "/", "", storage.Everything, &got) @@ -130,23 +136,25 @@ func TestList(t *testing.T) { } func TestListFiltered(t *testing.T) { + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) - helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) + helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) - list := api.PodList{ - Items: []api.Pod{ + list := example.PodList{ + Items: []example.Pod{ { ObjectMeta: metav1.ObjectMeta{Name: "bar"}, - Spec: apitesting.DeepEqualSafePodSpec(), + Spec: storagetests.DeepEqualSafePodSpec(), }, { ObjectMeta: metav1.ObjectMeta{Name: "baz"}, - Spec: apitesting.DeepEqualSafePodSpec(), + Spec: storagetests.DeepEqualSafePodSpec(), }, { ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Spec: apitesting.DeepEqualSafePodSpec(), + Spec: storagetests.DeepEqualSafePodSpec(), }, }, } @@ -157,11 +165,11 @@ func TestListFiltered(t *testing.T) { Label: labels.Everything(), Field: fields.SelectorFromSet(fields.Set{"metadata.name": "bar"}), GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod := obj.(*api.Pod) + pod := obj.(*example.Pod) return labels.Set(pod.Labels), fields.Set{"metadata.name": pod.Name}, nil }, } - var got api.PodList + var got example.PodList err := helper.List(context.TODO(), "/", "", p, &got) if err != nil { t.Errorf("Unexpected error %v", err) @@ -174,31 +182,33 @@ func TestListFiltered(t *testing.T) { // TestListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query func TestListAcrossDirectories(t *testing.T) { + scheme, codecs := testScheme(t) server := etcdtesting.NewEtcdTestClientServer(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) defer server.Terminate(t) - roothelper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) - helper1 := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()+"/dir1") - helper2 := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()+"/dir2") + roothelper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) + helper1 := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()+"/dir1") + helper2 := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()+"/dir2") - list := api.PodList{ - Items: []api.Pod{ + list := example.PodList{ + Items: []example.Pod{ { ObjectMeta: metav1.ObjectMeta{Name: "baz"}, - Spec: apitesting.DeepEqualSafePodSpec(), + Spec: storagetests.DeepEqualSafePodSpec(), }, { ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Spec: apitesting.DeepEqualSafePodSpec(), + Spec: storagetests.DeepEqualSafePodSpec(), }, { ObjectMeta: metav1.ObjectMeta{Name: "bar"}, - Spec: apitesting.DeepEqualSafePodSpec(), + Spec: storagetests.DeepEqualSafePodSpec(), }, }, } - returnedObj := &api.Pod{} + returnedObj := &example.Pod{} // create the 1st 2 elements in one directory createObj(t, helper1, list.Items[0].Name, &list.Items[0], returnedObj, 0) list.Items[0] = *returnedObj @@ -208,7 +218,7 @@ func TestListAcrossDirectories(t *testing.T) { createObj(t, helper2, list.Items[2].Name, &list.Items[2], returnedObj, 0) list.Items[2] = *returnedObj - var got api.PodList + var got example.PodList err := roothelper.List(context.TODO(), "/", "", storage.Everything, &got) if err != nil { t.Errorf("Unexpected error %v", err) @@ -219,15 +229,17 @@ func TestListAcrossDirectories(t *testing.T) { } func TestGet(t *testing.T) { + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) key := "/some/key" - helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) - expect := api.Pod{ + helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) + expect := example.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Spec: apitesting.DeepEqualSafePodSpec(), + Spec: storagetests.DeepEqualSafePodSpec(), } - var got api.Pod + var got example.Pod if err := helper.Create(context.TODO(), key, &expect, &got, 0); err != nil { t.Errorf("Unexpected error %#v", err) } @@ -241,12 +253,14 @@ func TestGet(t *testing.T) { } func TestGetNotFoundErr(t *testing.T) { + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: "v1"}) server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) boguskey := "/some/boguskey" - helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) + helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) - var got api.Pod + var got example.Pod err := helper.Get(context.TODO(), boguskey, "", &got, false) if !storage.IsNotFound(err) { t.Errorf("Unexpected reponse on key=%v, err=%v", boguskey, err) @@ -254,16 +268,18 @@ func TestGetNotFoundErr(t *testing.T) { } func TestCreate(t *testing.T) { - obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) - helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) - returnedObj := &api.Pod{} + helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) + returnedObj := &example.Pod{} err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 5) if err != nil { t.Errorf("Unexpected error %#v", err) } - _, err = runtime.Encode(testapi.Default.Codec(), obj) + _, err = runtime.Encode(codec, obj) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -271,7 +287,7 @@ func TestCreate(t *testing.T) { if err != nil { t.Errorf("Unexpected error %#v", err) } - _, err = runtime.Encode(testapi.Default.Codec(), returnedObj) + _, err = runtime.Encode(codec, returnedObj) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -281,10 +297,12 @@ func TestCreate(t *testing.T) { } func TestCreateNilOutParam(t *testing.T) { - obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) - helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()) + helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) err := helper.Create(context.TODO(), "/some/key", obj, nil, 5) if err != nil { t.Errorf("Unexpected error %#v", err) @@ -292,11 +310,12 @@ func TestCreateNilOutParam(t *testing.T) { } func TestGuaranteedUpdate(t *testing.T) { - _, codec := testScheme(t) + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal}) server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) key := "/some/key" - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) + helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { @@ -337,11 +356,12 @@ func TestGuaranteedUpdate(t *testing.T) { } func TestGuaranteedUpdateNoChange(t *testing.T) { - _, codec := testScheme(t) + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal}) server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) key := "/some/key" - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) + helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { @@ -367,11 +387,12 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { } func TestGuaranteedUpdateKeyNotFound(t *testing.T) { - _, codec := testScheme(t) + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal}) server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) key := "/some/key" - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) + helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) // Create a new node. obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} @@ -394,11 +415,12 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) { } func TestGuaranteedUpdate_CreateCollision(t *testing.T) { - _, codec := testScheme(t) + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal}) server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) key := "/some/key" - helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) + helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) const concurrency = 10 var wgDone sync.WaitGroup @@ -443,13 +465,15 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) { } func TestGuaranteedUpdateUIDMismatch(t *testing.T) { + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) prefix := path.Join("/", etcdtest.PathPrefix()) - helper := newEtcdHelper(server.Client, testapi.Default.Codec(), prefix) + helper := newEtcdHelper(server.Client, scheme, codec, prefix) - obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}} - podPtr := &api.Pod{} + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}} + podPtr := &example.Pod{} err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0) if err != nil { t.Fatalf("Unexpected error %#v", err) @@ -463,13 +487,15 @@ func TestGuaranteedUpdateUIDMismatch(t *testing.T) { } func TestDeleteUIDMismatch(t *testing.T) { + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) prefix := path.Join("/", etcdtest.PathPrefix()) - helper := newEtcdHelper(server.Client, testapi.Default.Codec(), prefix) + helper := newEtcdHelper(server.Client, scheme, codec, prefix) - obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}} - podPtr := &api.Pod{} + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}} + podPtr := &example.Pod{} err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0) if err != nil { t.Fatalf("Unexpected error %#v", err) @@ -503,23 +529,25 @@ func (f *fakeDeleteKeysAPI) Get(ctx context.Context, key string, opts *etcd.GetO // deletion yet. Etcd will fail the deletion and report the conflict. etcdHelper // should retry until there is no conflict. func TestDeleteWithRetry(t *testing.T) { + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) prefix := path.Join("/", etcdtest.PathPrefix()) - obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}} + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}} // fakeGet returns a large ModifiedIndex to emulate the case that another // party has updated the object. fakeGet := func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) { - data, _ := runtime.Encode(testapi.Default.Codec(), obj) + data, _ := runtime.Encode(codec, obj) return &etcd.Response{Node: &etcd.Node{Value: string(data), ModifiedIndex: 99}}, nil } expectedRetries := 3 - helper := newEtcdHelper(server.Client, testapi.Default.Codec(), prefix) + helper := newEtcdHelper(server.Client, scheme, codec, prefix) fake := &fakeDeleteKeysAPI{KeysAPI: helper.etcdKeysAPI, fakeGetCap: expectedRetries, fakeGetFunc: fakeGet} helper.etcdKeysAPI = fake - returnedObj := &api.Pod{} + returnedObj := &example.Pod{} err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 0) if err != nil { t.Errorf("Unexpected error %#v", err) diff --git a/pkg/storage/etcd/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go index 2dca3a2414..cfdb5a7abe 100644 --- a/pkg/storage/etcd/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -21,12 +21,14 @@ import ( "testing" apiequality "k8s.io/apimachinery/pkg/api/equality" + apitesting "k8s.io/apimachinery/pkg/api/testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/apis/example" + examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" @@ -49,11 +51,12 @@ func (f *fakeEtcdCache) addToCache(index uint64, obj runtime.Object) { var _ etcdCache = &fakeEtcdCache{} func TestWatchInterpretations(t *testing.T) { - codec := testapi.Default.Codec() + _, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) // Declare some pods to make the test cases compact. - podFoo := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - podBar := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} - podBaz := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz"}} + podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} + podBaz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz"}} // All of these test cases will be run with the firstLetterIsB Filter. table := map[string]struct { @@ -126,7 +129,7 @@ func TestWatchInterpretations(t *testing.T) { }, } firstLetterIsB := func(obj runtime.Object) bool { - return obj.(*api.Pod).Name[0] == 'b' + return obj.(*example.Pod).Name[0] == 'b' } for name, item := range table { for _, action := range item.actions { @@ -168,7 +171,8 @@ func TestWatchInterpretations(t *testing.T) { } func TestWatchInterpretation_ResponseNotSet(t *testing.T) { - _, codec := testScheme(t) + _, codecs := testScheme(t) + codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) @@ -181,7 +185,8 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) { } func TestWatchInterpretation_ResponseNoNode(t *testing.T) { - _, codec := testScheme(t) + _, codecs := testScheme(t) + codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) @@ -196,7 +201,8 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) { } func TestWatchInterpretation_ResponseBadData(t *testing.T) { - _, codec := testScheme(t) + _, codecs := testScheme(t) + codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{}) @@ -220,9 +226,10 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { } func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { - codec := testapi.Default.Codec() + _, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) filter := func(obj runtime.Object) bool { - return obj.(*api.Pod).Name != "bar" + return obj.(*example.Pod).Name != "bar" } w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{}) @@ -231,8 +238,8 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { eventChan <- e } - fooPod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - barPod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} + fooPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + barPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} fooBytes, err := runtime.Encode(codec, fooPod) if err != nil { t.Fatalf("Encode failed: %v", err) @@ -279,7 +286,7 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { t.Errorf("#%d: event type want=Deleted, get=%s", i, ev.Type) return } - rv := ev.Object.(*api.Pod).ResourceVersion + rv := ev.Object.(*example.Pod).ResourceVersion if rv != tt.expRV { t.Errorf("#%d: resource version want=%s, get=%s", i, tt.expRV, rv) } @@ -288,11 +295,12 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) { } func TestWatch(t *testing.T) { - codec := testapi.Default.Codec() + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) key := "/some/key" - h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) + h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) if err != nil { @@ -301,8 +309,8 @@ func TestWatch(t *testing.T) { // watching is explicitly closed below. // Test normal case - pod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} - returnObj := &api.Pod{} + pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + returnObj := &example.Pod{} err = h.Create(context.TODO(), key, pod, returnObj, 0) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -330,36 +338,25 @@ func TestWatch(t *testing.T) { } } -func emptySubsets() []api.EndpointSubset { - return []api.EndpointSubset{} -} - -func makeSubsets(ip string, port int) []api.EndpointSubset { - return []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: ip}}, - Ports: []api.EndpointPort{{Port: int32(port)}}, - }} -} - func TestWatchEtcdState(t *testing.T) { - codec := testapi.Default.Codec() + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) key := "/somekey/foo" server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) - h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) + h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer watching.Stop() - endpoint := &api.Endpoints{ + pod := &example.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Subsets: emptySubsets(), } - err = h.Create(context.TODO(), key, endpoint, endpoint, 0) + err = h.Create(context.TODO(), key, pod, pod, 0) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -369,20 +366,21 @@ func TestWatchEtcdState(t *testing.T) { t.Errorf("Unexpected event %#v", event) } - subset := makeSubsets("127.0.0.1", 9000) - endpoint.Subsets = subset - endpoint.ResourceVersion = "" + pod.ResourceVersion = "" + pod.Status = example.PodStatus{ + Phase: example.PodPhase("Running"), + } // CAS the previous value updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { - newObj, err := api.Scheme.DeepCopy(endpoint) + newObj, err := scheme.DeepCopy(pod) if err != nil { t.Errorf("unexpected error: %v", err) return nil, nil, err } - return newObj.(*api.Endpoints), nil, nil + return newObj.(*example.Pod), nil, nil } - err = h.GuaranteedUpdate(context.TODO(), key, &api.Endpoints{}, false, nil, updateFn) + err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -392,20 +390,21 @@ func TestWatchEtcdState(t *testing.T) { t.Errorf("Unexpected event %#v", event) } - if e, a := endpoint, event.Object; !apiequality.Semantic.DeepDerivative(e, a) { + if e, a := pod, event.Object; !apiequality.Semantic.DeepDerivative(e, a) { t.Errorf("Unexpected error: expected %#v, got %#v", e, a) } } func TestWatchFromZeroIndex(t *testing.T) { - codec := testapi.Default.Codec() - pod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} key := "/somekey/foo" server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) - h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) + h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) // set before the watch and verify events err := h.Create(context.TODO(), key, pod, pod, 0) @@ -428,11 +427,11 @@ func TestWatchFromZeroIndex(t *testing.T) { // check for concatenation on watch event with CAS updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { - pod := input.(*api.Pod) + pod := input.(*example.Pod) pod.Name = "bar" return pod, nil, nil } - err = h.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, false, nil, updateFn) + err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -451,11 +450,11 @@ func TestWatchFromZeroIndex(t *testing.T) { pod.Name = "baz" updateFn = func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { - pod := input.(*api.Pod) + pod := input.(*example.Pod) pod.Name = "baz" return pod, nil, nil } - err = h.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, false, nil, updateFn) + err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -471,11 +470,12 @@ func TestWatchFromZeroIndex(t *testing.T) { } func TestWatchListFromZeroIndex(t *testing.T) { - codec := testapi.Default.Codec() + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) prefix := "/some/key" server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) - h := newEtcdHelper(server.Client, codec, prefix) + h := newEtcdHelper(server.Client, scheme, codec, prefix) watching, err := h.WatchList(context.TODO(), "/", "0", storage.Everything) if err != nil { @@ -484,7 +484,7 @@ func TestWatchListFromZeroIndex(t *testing.T) { defer watching.Stop() // creates foo which should trigger the WatchList for "/" - pod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} err = h.Create(context.TODO(), pod.Name, pod, pod, 0) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -501,12 +501,13 @@ func TestWatchListFromZeroIndex(t *testing.T) { } func TestWatchListIgnoresRootKey(t *testing.T) { - codec := testapi.Default.Codec() - pod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + scheme, codecs := testScheme(t) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} key := "/some/key" server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) - h := newEtcdHelper(server.Client, codec, key) + h := newEtcdHelper(server.Client, scheme, codec, key) watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything) if err != nil { @@ -532,11 +533,12 @@ func TestWatchListIgnoresRootKey(t *testing.T) { } func TestWatchPurposefulShutdown(t *testing.T) { - _, codec := testScheme(t) + scheme, codecs := testScheme(t) + codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"}) server := etcdtesting.NewEtcdTestClientServer(t) defer server.Terminate(t) key := "/some/key" - h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) + h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) // Test purposeful shutdown watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) diff --git a/pkg/storage/etcd/testing/utils.go b/pkg/storage/etcd/testing/utils.go index 68facc01e9..9e570243db 100644 --- a/pkg/storage/etcd/testing/utils.go +++ b/pkg/storage/etcd/testing/utils.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest" @@ -310,7 +311,7 @@ func NewUnsecuredEtcdTestClientServer(t *testing.T) *EtcdTestServer { } // NewEtcd3TestClientServer creates a new client and server for testing -func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) { +func NewUnsecuredEtcd3TestClientServer(t *testing.T, scheme *runtime.Scheme) (*EtcdTestServer, *storagebackend.Config) { server := &EtcdTestServer{ v3Cluster: integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}), } @@ -320,6 +321,7 @@ func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storageb Prefix: etcdtest.PathPrefix(), ServerList: server.V3Client.Endpoints(), DeserializationCacheSize: etcdtest.DeserializationCacheSize, + Copier: scheme, } return server, config } diff --git a/pkg/storage/storagebackend/factory/etcd2.go b/pkg/storage/storagebackend/factory/etcd2.go index cbaffa957e..c6e5f52c43 100644 --- a/pkg/storage/storagebackend/factory/etcd2.go +++ b/pkg/storage/storagebackend/factory/etcd2.go @@ -39,7 +39,7 @@ func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e if err != nil { return nil, nil, err } - s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize) + s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, c.Copier) return s, tr.CloseIdleConnections, nil } diff --git a/pkg/storage/tests/cacher_test.go b/pkg/storage/tests/cacher_test.go index d07982671d..7cebc6e070 100644 --- a/pkg/storage/tests/cacher_test.go +++ b/pkg/storage/tests/cacher_test.go @@ -27,6 +27,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + apitesting "k8s.io/apimachinery/pkg/api/testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -34,9 +35,9 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/apis/example" + examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" - "k8s.io/client-go/pkg/api" - "k8s.io/client-go/pkg/api/v1" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" @@ -44,23 +45,24 @@ import ( "golang.org/x/net/context" + "k8s.io/apimachinery/pkg/runtime/serializer" _ "k8s.io/client-go/pkg/api/install" ) -func DeepEqualSafePodSpec() api.PodSpec { - grace := int64(30) - return api.PodSpec{ - RestartPolicy: api.RestartPolicyAlways, - DNSPolicy: api.DNSClusterFirst, - TerminationGracePeriodSeconds: &grace, - SecurityContext: &api.PodSecurityContext{}, - SchedulerName: api.DefaultSchedulerName, - } +var ( + scheme = runtime.NewScheme() + codecs = serializer.NewCodecFactory(scheme) +) + +func init() { + metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) + example.AddToScheme(scheme) + examplev1.AddToScheme(scheme) } // GetAttrs returns labels and fields of a given object for filtering purposes. func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { - pod, ok := obj.(*api.Pod) + pod, ok := obj.(*example.Pod) if !ok { return nil, nil, fmt.Errorf("not a pod") } @@ -69,7 +71,7 @@ func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { // PodToSelectableFields returns a field set that represents the object // TODO: fields are not labels, and the validation rules for them do not apply. -func PodToSelectableFields(pod *api.Pod) fields.Set { +func PodToSelectableFields(pod *example.Pod) fields.Set { // The purpose of allocation with a given number of elements is to reduce // amount of allocations needed to create the fields.Set. If you add any // field here or the number of object-meta related fields changes, this should @@ -89,9 +91,9 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha return source } -func newEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) { - server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, codec, prefix) +func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) { + server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix) return server, storage } @@ -101,39 +103,39 @@ func newTestCacher(s storage.Interface, cap int) *storage.Cacher { CacheCapacity: cap, Storage: s, Versioner: etcdstorage.APIObjectVersioner{}, - Copier: api.Scheme, - Type: &api.Pod{}, + Copier: scheme, + Type: &example.Pod{}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, GetAttrsFunc: GetAttrs, - NewListFunc: func() runtime.Object { return &api.PodList{} }, - Codec: api.Codecs.LegacyCodec(v1.SchemeGroupVersion), + NewListFunc: func() runtime.Object { return &example.PodList{} }, + Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), } return storage.NewCacherFromConfig(config) } -func makeTestPod(name string) *api.Pod { - return &api.Pod{ +func makeTestPod(name string) *example.Pod { + return &example.Pod{ ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name}, Spec: DeepEqualSafePodSpec(), } } -func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod { +func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod { updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { - newObj, err := api.Scheme.DeepCopy(obj) + newObj, err := scheme.DeepCopy(obj) if err != nil { t.Errorf("unexpected error: %v", err) return nil, nil, err } - return newObj.(*api.Pod), nil, nil + return newObj.(*example.Pod), nil, nil } key := "pods/" + obj.Namespace + "/" + obj.Name - if err := s.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, old == nil, nil, updateFn); err != nil { + if err := s.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, old == nil, nil, updateFn); err != nil { t.Errorf("unexpected error: %v", err) } obj.ResourceVersion = "" - result := &api.Pod{} + result := &example.Pod{} if err := s.Get(context.TODO(), key, "", result, false); err != nil { t.Errorf("unexpected error: %v", err) } @@ -141,7 +143,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod { } func TestGet(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) + server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) defer server.Terminate(t) cacher := newTestCacher(etcdStorage, 10) defer cacher.Stop() @@ -150,7 +152,7 @@ func TestGet(t *testing.T) { fooCreated := updatePod(t, etcdStorage, podFoo, nil) // We pass the ResourceVersion from the above Create() operation. - result := &api.Pod{} + result := &example.Pod{} if err := cacher.Get(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, result, true); err != nil { t.Errorf("Unexpected error: %v", err) } @@ -161,7 +163,7 @@ func TestGet(t *testing.T) { if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, true); err != nil { t.Errorf("Unexpected error: %v", err) } - emptyPod := api.Pod{} + emptyPod := example.Pod{} if e, a := emptyPod, *result; !reflect.DeepEqual(e, a) { t.Errorf("Expected: %#v, got: %#v", e, a) } @@ -172,7 +174,7 @@ func TestGet(t *testing.T) { } func TestList(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) + server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) defer server.Terminate(t) cacher := newTestCacher(etcdStorage, 10) defer cacher.Stop() @@ -196,20 +198,20 @@ func TestList(t *testing.T) { podFooNS2.Namespace += "2" updatePod(t, etcdStorage, podFooNS2, nil) - deleted := api.Pod{} + deleted := example.Pod{} if err := etcdStorage.Delete(context.TODO(), "pods/ns/bar", &deleted, nil); err != nil { t.Errorf("Unexpected error: %v", err) } // We first List directly from etcd by passing empty resourceVersion, // to get the current etcd resourceVersion. - rvResult := &api.PodList{} + rvResult := &example.PodList{} if err := cacher.List(context.TODO(), "pods/ns", "", storage.Everything, rvResult); err != nil { t.Errorf("Unexpected error: %v", err) } deletedPodRV := rvResult.ListMeta.ResourceVersion - result := &api.PodList{} + result := &example.PodList{} // We pass the current etcd ResourceVersion received from the above List() operation, // since there is not easy way to get ResourceVersion of barPod deletion operation. if err := cacher.List(context.TODO(), "pods/ns", deletedPodRV, storage.Everything, result); err != nil { @@ -237,7 +239,7 @@ func TestList(t *testing.T) { t.Errorf("Unexpected namespace: %s", item.Namespace) } - var expected *api.Pod + var expected *example.Pod switch item.Name { case "foo": expected = podFooPrime @@ -253,7 +255,7 @@ func TestList(t *testing.T) { } func TestInfiniteList(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) + server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) defer server.Terminate(t) cacher := newTestCacher(etcdStorage, 10) defer cacher.Stop() @@ -268,7 +270,7 @@ func TestInfiniteList(t *testing.T) { } listRV := strconv.Itoa(int(rv + 10)) - result := &api.PodList{} + result := &example.PodList{} err = cacher.List(context.TODO(), "pods/ns", listRV, storage.Everything, result) if !errors.IsTimeout(err) { t.Errorf("Unexpected error: %v", err) @@ -307,7 +309,7 @@ func (self *injectListError) List(ctx context.Context, key string, resourceVersi } func TestWatch(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) + server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) // Inject one list error to make sure we test the relist case. etcdStorage = &injectListError{errors: 1, Interface: etcdStorage} defer server.Terminate(t) @@ -384,7 +386,7 @@ func TestWatch(t *testing.T) { } func TestWatcherTimeout(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) + server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) defer server.Terminate(t) cacher := newTestCacher(etcdStorage, 10) defer cacher.Stop() @@ -426,7 +428,7 @@ func TestWatcherTimeout(t *testing.T) { } func TestFiltering(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) + server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) defer server.Terminate(t) cacher := newTestCacher(etcdStorage, 10) defer cacher.Stop() @@ -458,7 +460,7 @@ func TestFiltering(t *testing.T) { fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered) _ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered) - deleted := api.Pod{} + deleted := example.Pod{} if err := etcdStorage.Delete(context.TODO(), "pods/ns/foo", &deleted, nil); err != nil { t.Errorf("Unexpected error: %v", err) } @@ -488,7 +490,7 @@ func TestFiltering(t *testing.T) { } func TestStartingResourceVersion(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) + server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) defer server.Terminate(t) cacher := newTestCacher(etcdStorage, 10) defer cacher.Stop() @@ -520,7 +522,7 @@ func TestStartingResourceVersion(t *testing.T) { select { case e := <-watcher.ResultChan(): - pod := e.Object.(*api.Pod) + pod := e.Object.(*example.Pod) podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -536,7 +538,7 @@ func TestStartingResourceVersion(t *testing.T) { } func TestRandomWatchDeliver(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix()) + server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) defer server.Terminate(t) cacher := newTestCacher(etcdStorage, 10) defer cacher.Stop() @@ -569,7 +571,7 @@ func TestRandomWatchDeliver(t *testing.T) { if !ok { break } - if a, e := event.Object.(*api.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a { + if a, e := event.Object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a { t.Errorf("Unexpected object watched: %s, expected %s", a, e) } watched++ diff --git a/pkg/storage/tests/utils.go b/pkg/storage/tests/utils.go new file mode 100644 index 0000000000..f1a5d95b80 --- /dev/null +++ b/pkg/storage/tests/utils.go @@ -0,0 +1,32 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tests + +import ( + "k8s.io/apiserver/pkg/apis/example" + + _ "k8s.io/client-go/pkg/api/install" +) + +func DeepEqualSafePodSpec() example.PodSpec { + grace := int64(30) + return example.PodSpec{ + RestartPolicy: "Always", + TerminationGracePeriodSeconds: &grace, + SchedulerName: "default-scheduler", + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index 4cbdbece05..ae885eeda2 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/storage/storagebackend" ) @@ -34,13 +35,14 @@ type EtcdOptions struct { EtcdServersOverrides []string } -func NewEtcdOptions() *EtcdOptions { +func NewEtcdOptions(scheme *runtime.Scheme) *EtcdOptions { return &EtcdOptions{ StorageConfig: storagebackend.Config{ Prefix: DefaultEtcdPathPrefix, // Default cache size to 0 - if unset, its size will be set based on target // memory usage. DeserializationCacheSize: 0, + Copier: scheme, }, } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go index 470a99e87d..e1cdd8244f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go @@ -44,4 +44,5 @@ type Config struct { DeserializationCacheSize int Codec runtime.Codec + Copier runtime.ObjectCopier } diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index b21100ad68..efbb91f994 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -311,6 +311,7 @@ func NewMasterConfig() *master.Config { // prefix code, so please don't change without ensuring // sufficient coverage in other ways. Prefix: uuid.New(), + Copier: api.Scheme, } info, _ := runtime.SerializerInfoForMediaType(api.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)