pkg/storage/etcd: cut off pkg/api scheme

pull/6/head
Dr. Stefan Schimanski 2017-02-01 13:53:43 +01:00
parent 01ac994541
commit 2f9fa55c6f
19 changed files with 259 additions and 187 deletions

View File

@ -63,7 +63,7 @@ type DiscoveryServerOptions struct {
// NewCommandStartMaster provides a CLI handler for 'start master' command
func NewCommandStartDiscoveryServer(out, err io.Writer) *cobra.Command {
o := &DiscoveryServerOptions{
Etcd: genericoptions.NewEtcdOptions(),
Etcd: genericoptions.NewEtcdOptions(api.Scheme),
SecureServing: genericoptions.NewSecureServingOptions(),
Authentication: genericoptions.NewDelegatingAuthenticationOptions(),
Authorization: genericoptions.NewDelegatingAuthorizationOptions(),

View File

@ -65,7 +65,7 @@ type ServerRunOptions struct {
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(),
Etcd: genericoptions.NewEtcdOptions(api.Scheme),
SecureServing: genericoptions.NewSecureServingOptions(),
InsecureServing: genericoptions.NewInsecureServingOptions(),
Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),

View File

@ -49,6 +49,7 @@ func newStorageFactory() genericapiserver.StorageFactory {
config := storagebackend.Config{
Prefix: genericoptions.DefaultEtcdPathPrefix,
ServerList: []string{"http://127.0.0.1:2379"},
Copier: api.Scheme,
}
storageFactory := genericapiserver.NewDefaultStorageFactory(config, "application/json", api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig())
@ -67,7 +68,7 @@ type ServerRunOptions struct {
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(),
Etcd: genericoptions.NewEtcdOptions(api.Scheme),
SecureServing: genericoptions.NewSecureServingOptions(),
InsecureServing: genericoptions.NewInsecureServingOptions(),
Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),

View File

@ -21,6 +21,7 @@ import (
"time"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/kubernetes/pkg/api"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
// add the kubernetes feature gates
@ -47,7 +48,7 @@ type ServerRunOptions struct {
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(),
Etcd: genericoptions.NewEtcdOptions(api.Scheme),
SecureServing: genericoptions.NewSecureServingOptions(),
InsecureServing: genericoptions.NewInsecureServingOptions(),
Authentication: kubeoptions.NewBuiltInAuthenticationOptions().WithAll(),

View File

@ -1235,7 +1235,7 @@ func TestStoreWatch(t *testing.T) {
func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (factory.DestroyFunc, *Store) {
podPrefix := "/pods"
server, sc := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
server, sc := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, api.Scheme)
strategy := &testRESTStrategy{api.Scheme, names.SimpleNameGenerator, true, false, true}
sc.Codec = testapi.Default.StorageCodec()

View File

@ -75,7 +75,7 @@ func init() {
// setUp is a convience function for setting up for (most) tests.
func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
etcdServer, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
etcdServer, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme)
config := NewConfig()
config.PublicAddress = net.ParseIP("192.168.10.4")

View File

@ -113,6 +113,7 @@ func TestUpdateEtcdOverrides(t *testing.T) {
defaultConfig := storagebackend.Config{
Prefix: options.DefaultEtcdPathPrefix,
ServerList: defaultEtcdLocation,
Copier: scheme,
}
storageFactory := NewDefaultStorageFactory(defaultConfig, "", api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
storageFactory.SetEtcdLocation(test.resource, test.servers)

View File

@ -59,7 +59,7 @@ import (
// setUp is a convience function for setting up for (most) tests.
func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
server, storageConfig := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
server, storageConfig := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, api.Scheme)
config := &Config{
GenericConfig: genericapiserver.NewConfig(),

View File

@ -38,7 +38,7 @@ import (
)
func NewEtcdStorage(t *testing.T, group string) (*storagebackend.Config, *etcdtesting.EtcdTestServer) {
server, config := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
server, config := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, api.Scheme)
config.Codec = testapi.Groups[group].StorageCodec()
return config, server
}

View File

@ -36,19 +36,18 @@ import (
"k8s.io/apiserver/pkg/storage/etcd/metrics"
utilcache "k8s.io/apiserver/pkg/util/cache"
utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/kubernetes/pkg/api"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
)
// Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface {
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int, copier runtime.ObjectCopier) storage.Interface {
return &etcdHelper{
etcdMembersAPI: etcd.NewMembersAPI(client),
etcdKeysAPI: etcd.NewKeysAPI(client),
codec: codec,
versioner: APIObjectVersioner{},
copier: api.Scheme,
copier: copier,
pathPrefix: path.Join("/", prefix),
quorum: quorum,
cache: utilcache.NewCache(cacheSize),

View File

@ -25,26 +25,30 @@ import (
etcd "github.com/coreos/etcd/client"
"golang.org/x/net/context"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
storagetesting "k8s.io/kubernetes/pkg/storage/testing"
storagetests "k8s.io/kubernetes/pkg/storage/tests"
)
func testScheme(t *testing.T) (*runtime.Scheme, runtime.Codec) {
func testScheme(t *testing.T) (*runtime.Scheme, serializer.CodecFactory) {
scheme := runtime.NewScheme()
scheme.Log(t)
scheme.AddKnownTypes(api.Registry.GroupOrDie(api.GroupName).GroupVersion, &storagetesting.TestResource{})
scheme.AddKnownTypes(testapi.Default.InternalGroupVersion(), &storagetesting.TestResource{})
scheme.AddKnownTypes(schema.GroupVersion{Version: runtime.APIVersionInternal}, &storagetesting.TestResource{})
scheme.AddKnownTypes(schema.GroupVersion{Version: runtime.APIVersionInternal}, &storagetesting.TestResource{})
example.AddToScheme(scheme)
examplev1.AddToScheme(scheme)
if err := scheme.AddConversionFuncs(
func(in *storagetesting.TestResource, out *storagetesting.TestResource, s conversion.Scope) error {
*out = *in
@ -57,17 +61,17 @@ func testScheme(t *testing.T) (*runtime.Scheme, runtime.Codec) {
); err != nil {
panic(err)
}
codec := serializer.NewCodecFactory(scheme).LegacyCodec(api.Registry.GroupOrDie(api.GroupName).GroupVersion)
return scheme, codec
codecs := serializer.NewCodecFactory(scheme)
return scheme, codecs
}
func newEtcdHelper(client etcd.Client, codec runtime.Codec, prefix string) etcdHelper {
return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize).(*etcdHelper)
func newEtcdHelper(client etcd.Client, scheme *runtime.Scheme, codec runtime.Codec, prefix string) etcdHelper {
return *NewEtcdStorage(client, codec, prefix, false, etcdtest.DeserializationCacheSize, scheme).(*etcdHelper)
}
// Returns an encoded version of api.Pod with the given name.
func getEncodedPod(name string) string {
pod, _ := runtime.Encode(testapi.Default.Codec(), &api.Pod{
// Returns an encoded version of example.Pod with the given name.
func getEncodedPod(name string, codec runtime.Codec) string {
pod, _ := runtime.Encode(codec, &examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: name},
})
return string(pod)
@ -81,9 +85,9 @@ func createObj(t *testing.T, helper etcdHelper, name string, obj, out runtime.Ob
return err
}
func createPodList(t *testing.T, helper etcdHelper, list *api.PodList) error {
func createPodList(t *testing.T, helper etcdHelper, list *example.PodList) error {
for i := range list.Items {
returnedObj := &api.Pod{}
returnedObj := &example.Pod{}
err := createObj(t, helper, list.Items[i].Name, &list.Items[i], returnedObj, 0)
if err != nil {
return err
@ -94,29 +98,31 @@ func createPodList(t *testing.T, helper etcdHelper, list *api.PodList) error {
}
func TestList(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
list := api.PodList{
Items: []api.Pod{
list := example.PodList{
Items: []example.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
Spec: apitesting.DeepEqualSafePodSpec(),
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
Spec: apitesting.DeepEqualSafePodSpec(),
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: apitesting.DeepEqualSafePodSpec(),
Spec: storagetests.DeepEqualSafePodSpec(),
},
},
}
createPodList(t, helper, &list)
var got api.PodList
var got example.PodList
// TODO: a sorted filter function could be applied such implied
// ordering on the returned list doesn't matter.
err := helper.List(context.TODO(), "/", "", storage.Everything, &got)
@ -130,23 +136,25 @@ func TestList(t *testing.T) {
}
func TestListFiltered(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
list := api.PodList{
Items: []api.Pod{
list := example.PodList{
Items: []example.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
Spec: apitesting.DeepEqualSafePodSpec(),
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
Spec: apitesting.DeepEqualSafePodSpec(),
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: apitesting.DeepEqualSafePodSpec(),
Spec: storagetests.DeepEqualSafePodSpec(),
},
},
}
@ -157,11 +165,11 @@ func TestListFiltered(t *testing.T) {
Label: labels.Everything(),
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "bar"}),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod := obj.(*api.Pod)
pod := obj.(*example.Pod)
return labels.Set(pod.Labels), fields.Set{"metadata.name": pod.Name}, nil
},
}
var got api.PodList
var got example.PodList
err := helper.List(context.TODO(), "/", "", p, &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
@ -174,31 +182,33 @@ func TestListFiltered(t *testing.T) {
// TestListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query
func TestListAcrossDirectories(t *testing.T) {
scheme, codecs := testScheme(t)
server := etcdtesting.NewEtcdTestClientServer(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
defer server.Terminate(t)
roothelper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
helper1 := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()+"/dir1")
helper2 := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()+"/dir2")
roothelper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
helper1 := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()+"/dir1")
helper2 := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()+"/dir2")
list := api.PodList{
Items: []api.Pod{
list := example.PodList{
Items: []example.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "baz"},
Spec: apitesting.DeepEqualSafePodSpec(),
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: apitesting.DeepEqualSafePodSpec(),
Spec: storagetests.DeepEqualSafePodSpec(),
},
{
ObjectMeta: metav1.ObjectMeta{Name: "bar"},
Spec: apitesting.DeepEqualSafePodSpec(),
Spec: storagetests.DeepEqualSafePodSpec(),
},
},
}
returnedObj := &api.Pod{}
returnedObj := &example.Pod{}
// create the 1st 2 elements in one directory
createObj(t, helper1, list.Items[0].Name, &list.Items[0], returnedObj, 0)
list.Items[0] = *returnedObj
@ -208,7 +218,7 @@ func TestListAcrossDirectories(t *testing.T) {
createObj(t, helper2, list.Items[2].Name, &list.Items[2], returnedObj, 0)
list.Items[2] = *returnedObj
var got api.PodList
var got example.PodList
err := roothelper.List(context.TODO(), "/", "", storage.Everything, &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
@ -219,15 +229,17 @@ func TestListAcrossDirectories(t *testing.T) {
}
func TestGet(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
expect := api.Pod{
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
expect := example.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: apitesting.DeepEqualSafePodSpec(),
Spec: storagetests.DeepEqualSafePodSpec(),
}
var got api.Pod
var got example.Pod
if err := helper.Create(context.TODO(), key, &expect, &got, 0); err != nil {
t.Errorf("Unexpected error %#v", err)
}
@ -241,12 +253,14 @@ func TestGet(t *testing.T) {
}
func TestGetNotFoundErr(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: "v1"})
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
boguskey := "/some/boguskey"
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
var got api.Pod
var got example.Pod
err := helper.Get(context.TODO(), boguskey, "", &got, false)
if !storage.IsNotFound(err) {
t.Errorf("Unexpected reponse on key=%v, err=%v", boguskey, err)
@ -254,16 +268,18 @@ func TestGetNotFoundErr(t *testing.T) {
}
func TestCreate(t *testing.T) {
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
returnedObj := &api.Pod{}
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
returnedObj := &example.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 5)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
_, err = runtime.Encode(testapi.Default.Codec(), obj)
_, err = runtime.Encode(codec, obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
@ -271,7 +287,7 @@ func TestCreate(t *testing.T) {
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
_, err = runtime.Encode(testapi.Default.Codec(), returnedObj)
_, err = runtime.Encode(codec, returnedObj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
@ -281,10 +297,12 @@ func TestCreate(t *testing.T) {
}
func TestCreateNilOutParam(t *testing.T) {
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
err := helper.Create(context.TODO(), "/some/key", obj, nil, 5)
if err != nil {
t.Errorf("Unexpected error %#v", err)
@ -292,11 +310,12 @@ func TestCreateNilOutParam(t *testing.T) {
}
func TestGuaranteedUpdate(t *testing.T) {
_, codec := testScheme(t)
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
@ -337,11 +356,12 @@ func TestGuaranteedUpdate(t *testing.T) {
}
func TestGuaranteedUpdateNoChange(t *testing.T) {
_, codec := testScheme(t)
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
@ -367,11 +387,12 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
}
func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
_, codec := testScheme(t)
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
// Create a new node.
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
@ -394,11 +415,12 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) {
}
func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
_, codec := testScheme(t)
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, schema.GroupVersion{Version: runtime.APIVersionInternal})
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
helper := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
const concurrency = 10
var wgDone sync.WaitGroup
@ -443,13 +465,15 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) {
}
func TestGuaranteedUpdateUIDMismatch(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
prefix := path.Join("/", etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), prefix)
helper := newEtcdHelper(server.Client, scheme, codec, prefix)
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
podPtr := &api.Pod{}
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
podPtr := &example.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
@ -463,13 +487,15 @@ func TestGuaranteedUpdateUIDMismatch(t *testing.T) {
}
func TestDeleteUIDMismatch(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
prefix := path.Join("/", etcdtest.PathPrefix())
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), prefix)
helper := newEtcdHelper(server.Client, scheme, codec, prefix)
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
podPtr := &api.Pod{}
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
podPtr := &example.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, podPtr, 0)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
@ -503,23 +529,25 @@ func (f *fakeDeleteKeysAPI) Get(ctx context.Context, key string, opts *etcd.GetO
// deletion yet. Etcd will fail the deletion and report the conflict. etcdHelper
// should retry until there is no conflict.
func TestDeleteWithRetry(t *testing.T) {
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
prefix := path.Join("/", etcdtest.PathPrefix())
obj := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}
// fakeGet returns a large ModifiedIndex to emulate the case that another
// party has updated the object.
fakeGet := func(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
data, _ := runtime.Encode(testapi.Default.Codec(), obj)
data, _ := runtime.Encode(codec, obj)
return &etcd.Response{Node: &etcd.Node{Value: string(data), ModifiedIndex: 99}}, nil
}
expectedRetries := 3
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), prefix)
helper := newEtcdHelper(server.Client, scheme, codec, prefix)
fake := &fakeDeleteKeysAPI{KeysAPI: helper.etcdKeysAPI, fakeGetCap: expectedRetries, fakeGetFunc: fakeGet}
helper.etcdKeysAPI = fake
returnedObj := &api.Pod{}
returnedObj := &example.Pod{}
err := helper.Create(context.TODO(), "/some/key", obj, returnedObj, 0)
if err != nil {
t.Errorf("Unexpected error %#v", err)

View File

@ -21,12 +21,14 @@ import (
"testing"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
@ -49,11 +51,12 @@ func (f *fakeEtcdCache) addToCache(index uint64, obj runtime.Object) {
var _ etcdCache = &fakeEtcdCache{}
func TestWatchInterpretations(t *testing.T) {
codec := testapi.Default.Codec()
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
// Declare some pods to make the test cases compact.
podFoo := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
podBar := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
podBaz := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz"}}
podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
podBaz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz"}}
// All of these test cases will be run with the firstLetterIsB Filter.
table := map[string]struct {
@ -126,7 +129,7 @@ func TestWatchInterpretations(t *testing.T) {
},
}
firstLetterIsB := func(obj runtime.Object) bool {
return obj.(*api.Pod).Name[0] == 'b'
return obj.(*example.Pod).Name[0] == 'b'
}
for name, item := range table {
for _, action := range item.actions {
@ -168,7 +171,8 @@ func TestWatchInterpretations(t *testing.T) {
}
func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
_, codec := testScheme(t)
_, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
@ -181,7 +185,8 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
}
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
_, codec := testScheme(t)
_, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{})
@ -196,7 +201,8 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
}
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
_, codec := testScheme(t)
_, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, false, nil, storage.SimpleFilter(storage.Everything), codec, versioner, nil, &fakeEtcdCache{})
@ -220,9 +226,10 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
}
func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
codec := testapi.Default.Codec()
_, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
filter := func(obj runtime.Object) bool {
return obj.(*api.Pod).Name != "bar"
return obj.(*example.Pod).Name != "bar"
}
w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{})
@ -231,8 +238,8 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
eventChan <- e
}
fooPod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
barPod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
fooPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
barPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
fooBytes, err := runtime.Encode(codec, fooPod)
if err != nil {
t.Fatalf("Encode failed: %v", err)
@ -279,7 +286,7 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
t.Errorf("#%d: event type want=Deleted, get=%s", i, ev.Type)
return
}
rv := ev.Object.(*api.Pod).ResourceVersion
rv := ev.Object.(*example.Pod).ResourceVersion
if rv != tt.expRV {
t.Errorf("#%d: resource version want=%s, get=%s", i, tt.expRV, rv)
}
@ -288,11 +295,12 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
}
func TestWatch(t *testing.T) {
codec := testapi.Default.Codec()
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil {
@ -301,8 +309,8 @@ func TestWatch(t *testing.T) {
// watching is explicitly closed below.
// Test normal case
pod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
returnObj := &api.Pod{}
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
returnObj := &example.Pod{}
err = h.Create(context.TODO(), key, pod, returnObj, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@ -330,36 +338,25 @@ func TestWatch(t *testing.T) {
}
}
func emptySubsets() []api.EndpointSubset {
return []api.EndpointSubset{}
}
func makeSubsets(ip string, port int) []api.EndpointSubset {
return []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: ip}},
Ports: []api.EndpointPort{{Port: int32(port)}},
}}
}
func TestWatchEtcdState(t *testing.T) {
codec := testapi.Default.Codec()
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
key := "/somekey/foo"
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watching.Stop()
endpoint := &api.Endpoints{
pod := &example.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Subsets: emptySubsets(),
}
err = h.Create(context.TODO(), key, endpoint, endpoint, 0)
err = h.Create(context.TODO(), key, pod, pod, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -369,20 +366,21 @@ func TestWatchEtcdState(t *testing.T) {
t.Errorf("Unexpected event %#v", event)
}
subset := makeSubsets("127.0.0.1", 9000)
endpoint.Subsets = subset
endpoint.ResourceVersion = ""
pod.ResourceVersion = ""
pod.Status = example.PodStatus{
Phase: example.PodPhase("Running"),
}
// CAS the previous value
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
newObj, err := api.Scheme.DeepCopy(endpoint)
newObj, err := scheme.DeepCopy(pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
return nil, nil, err
}
return newObj.(*api.Endpoints), nil, nil
return newObj.(*example.Pod), nil, nil
}
err = h.GuaranteedUpdate(context.TODO(), key, &api.Endpoints{}, false, nil, updateFn)
err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -392,20 +390,21 @@ func TestWatchEtcdState(t *testing.T) {
t.Errorf("Unexpected event %#v", event)
}
if e, a := endpoint, event.Object; !apiequality.Semantic.DeepDerivative(e, a) {
if e, a := pod, event.Object; !apiequality.Semantic.DeepDerivative(e, a) {
t.Errorf("Unexpected error: expected %#v, got %#v", e, a)
}
}
func TestWatchFromZeroIndex(t *testing.T) {
codec := testapi.Default.Codec()
pod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key := "/somekey/foo"
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
// set before the watch and verify events
err := h.Create(context.TODO(), key, pod, pod, 0)
@ -428,11 +427,11 @@ func TestWatchFromZeroIndex(t *testing.T) {
// check for concatenation on watch event with CAS
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
pod := input.(*api.Pod)
pod := input.(*example.Pod)
pod.Name = "bar"
return pod, nil, nil
}
err = h.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, false, nil, updateFn)
err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -451,11 +450,11 @@ func TestWatchFromZeroIndex(t *testing.T) {
pod.Name = "baz"
updateFn = func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
pod := input.(*api.Pod)
pod := input.(*example.Pod)
pod.Name = "baz"
return pod, nil, nil
}
err = h.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, false, nil, updateFn)
err = h.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, false, nil, updateFn)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -471,11 +470,12 @@ func TestWatchFromZeroIndex(t *testing.T) {
}
func TestWatchListFromZeroIndex(t *testing.T) {
codec := testapi.Default.Codec()
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
prefix := "/some/key"
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, prefix)
h := newEtcdHelper(server.Client, scheme, codec, prefix)
watching, err := h.WatchList(context.TODO(), "/", "0", storage.Everything)
if err != nil {
@ -484,7 +484,7 @@ func TestWatchListFromZeroIndex(t *testing.T) {
defer watching.Stop()
// creates foo which should trigger the WatchList for "/"
pod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
err = h.Create(context.TODO(), pod.Name, pod, pod, 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@ -501,12 +501,13 @@ func TestWatchListFromZeroIndex(t *testing.T) {
}
func TestWatchListIgnoresRootKey(t *testing.T) {
codec := testapi.Default.Codec()
pod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
scheme, codecs := testScheme(t)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key := "/some/key"
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, key)
h := newEtcdHelper(server.Client, scheme, codec, key)
watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything)
if err != nil {
@ -532,11 +533,12 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
}
func TestWatchPurposefulShutdown(t *testing.T) {
_, codec := testScheme(t)
scheme, codecs := testScheme(t)
codec := codecs.LegacyCodec(schema.GroupVersion{Version: "v1"})
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
key := "/some/key"
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
h := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
// Test purposeful shutdown
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)

View File

@ -27,6 +27,7 @@ import (
"testing"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
@ -310,7 +311,7 @@ func NewUnsecuredEtcdTestClientServer(t *testing.T) *EtcdTestServer {
}
// NewEtcd3TestClientServer creates a new client and server for testing
func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) {
func NewUnsecuredEtcd3TestClientServer(t *testing.T, scheme *runtime.Scheme) (*EtcdTestServer, *storagebackend.Config) {
server := &EtcdTestServer{
v3Cluster: integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}),
}
@ -320,6 +321,7 @@ func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storageb
Prefix: etcdtest.PathPrefix(),
ServerList: server.V3Client.Endpoints(),
DeserializationCacheSize: etcdtest.DeserializationCacheSize,
Copier: scheme,
}
return server, config
}

View File

@ -39,7 +39,7 @@ func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
if err != nil {
return nil, nil, err
}
s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize)
s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize, c.Copier)
return s, tr.CloseIdleConnections, nil
}

View File

@ -27,6 +27,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
@ -34,9 +35,9 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
@ -44,23 +45,24 @@ import (
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/runtime/serializer"
_ "k8s.io/client-go/pkg/api/install"
)
func DeepEqualSafePodSpec() api.PodSpec {
grace := int64(30)
return api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
TerminationGracePeriodSeconds: &grace,
SecurityContext: &api.PodSecurityContext{},
SchedulerName: api.DefaultSchedulerName,
}
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
)
func init() {
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
example.AddToScheme(scheme)
examplev1.AddToScheme(scheme)
}
// GetAttrs returns labels and fields of a given object for filtering purposes.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
pod, ok := obj.(*api.Pod)
pod, ok := obj.(*example.Pod)
if !ok {
return nil, nil, fmt.Errorf("not a pod")
}
@ -69,7 +71,7 @@ func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
// PodToSelectableFields returns a field set that represents the object
// TODO: fields are not labels, and the validation rules for them do not apply.
func PodToSelectableFields(pod *api.Pod) fields.Set {
func PodToSelectableFields(pod *example.Pod) fields.Set {
// The purpose of allocation with a given number of elements is to reduce
// amount of allocations needed to create the fields.Set. If you add any
// field here or the number of object-meta related fields changes, this should
@ -89,9 +91,9 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
return source
}
func newEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, codec, prefix)
func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix)
return server, storage
}
@ -101,39 +103,39 @@ func newTestCacher(s storage.Interface, cap int) *storage.Cacher {
CacheCapacity: cap,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
Copier: api.Scheme,
Type: &api.Pod{},
Copier: scheme,
Type: &example.Pod{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: GetAttrs,
NewListFunc: func() runtime.Object { return &api.PodList{} },
Codec: api.Codecs.LegacyCodec(v1.SchemeGroupVersion),
NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
}
return storage.NewCacherFromConfig(config)
}
func makeTestPod(name string) *api.Pod {
return &api.Pod{
func makeTestPod(name string) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
Spec: DeepEqualSafePodSpec(),
}
}
func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod {
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
newObj, err := api.Scheme.DeepCopy(obj)
newObj, err := scheme.DeepCopy(obj)
if err != nil {
t.Errorf("unexpected error: %v", err)
return nil, nil, err
}
return newObj.(*api.Pod), nil, nil
return newObj.(*example.Pod), nil, nil
}
key := "pods/" + obj.Namespace + "/" + obj.Name
if err := s.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, old == nil, nil, updateFn); err != nil {
if err := s.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, old == nil, nil, updateFn); err != nil {
t.Errorf("unexpected error: %v", err)
}
obj.ResourceVersion = ""
result := &api.Pod{}
result := &example.Pod{}
if err := s.Get(context.TODO(), key, "", result, false); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -141,7 +143,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
}
func TestGet(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -150,7 +152,7 @@ func TestGet(t *testing.T) {
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
// We pass the ResourceVersion from the above Create() operation.
result := &api.Pod{}
result := &example.Pod{}
if err := cacher.Get(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, result, true); err != nil {
t.Errorf("Unexpected error: %v", err)
}
@ -161,7 +163,7 @@ func TestGet(t *testing.T) {
if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, true); err != nil {
t.Errorf("Unexpected error: %v", err)
}
emptyPod := api.Pod{}
emptyPod := example.Pod{}
if e, a := emptyPod, *result; !reflect.DeepEqual(e, a) {
t.Errorf("Expected: %#v, got: %#v", e, a)
}
@ -172,7 +174,7 @@ func TestGet(t *testing.T) {
}
func TestList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -196,20 +198,20 @@ func TestList(t *testing.T) {
podFooNS2.Namespace += "2"
updatePod(t, etcdStorage, podFooNS2, nil)
deleted := api.Pod{}
deleted := example.Pod{}
if err := etcdStorage.Delete(context.TODO(), "pods/ns/bar", &deleted, nil); err != nil {
t.Errorf("Unexpected error: %v", err)
}
// We first List directly from etcd by passing empty resourceVersion,
// to get the current etcd resourceVersion.
rvResult := &api.PodList{}
rvResult := &example.PodList{}
if err := cacher.List(context.TODO(), "pods/ns", "", storage.Everything, rvResult); err != nil {
t.Errorf("Unexpected error: %v", err)
}
deletedPodRV := rvResult.ListMeta.ResourceVersion
result := &api.PodList{}
result := &example.PodList{}
// We pass the current etcd ResourceVersion received from the above List() operation,
// since there is not easy way to get ResourceVersion of barPod deletion operation.
if err := cacher.List(context.TODO(), "pods/ns", deletedPodRV, storage.Everything, result); err != nil {
@ -237,7 +239,7 @@ func TestList(t *testing.T) {
t.Errorf("Unexpected namespace: %s", item.Namespace)
}
var expected *api.Pod
var expected *example.Pod
switch item.Name {
case "foo":
expected = podFooPrime
@ -253,7 +255,7 @@ func TestList(t *testing.T) {
}
func TestInfiniteList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -268,7 +270,7 @@ func TestInfiniteList(t *testing.T) {
}
listRV := strconv.Itoa(int(rv + 10))
result := &api.PodList{}
result := &example.PodList{}
err = cacher.List(context.TODO(), "pods/ns", listRV, storage.Everything, result)
if !errors.IsTimeout(err) {
t.Errorf("Unexpected error: %v", err)
@ -307,7 +309,7 @@ func (self *injectListError) List(ctx context.Context, key string, resourceVersi
}
func TestWatch(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
// Inject one list error to make sure we test the relist case.
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
defer server.Terminate(t)
@ -384,7 +386,7 @@ func TestWatch(t *testing.T) {
}
func TestWatcherTimeout(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -426,7 +428,7 @@ func TestWatcherTimeout(t *testing.T) {
}
func TestFiltering(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -458,7 +460,7 @@ func TestFiltering(t *testing.T) {
fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered)
_ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered)
deleted := api.Pod{}
deleted := example.Pod{}
if err := etcdStorage.Delete(context.TODO(), "pods/ns/foo", &deleted, nil); err != nil {
t.Errorf("Unexpected error: %v", err)
}
@ -488,7 +490,7 @@ func TestFiltering(t *testing.T) {
}
func TestStartingResourceVersion(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -520,7 +522,7 @@ func TestStartingResourceVersion(t *testing.T) {
select {
case e := <-watcher.ResultChan():
pod := e.Object.(*api.Pod)
pod := e.Object.(*example.Pod)
podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -536,7 +538,7 @@ func TestStartingResourceVersion(t *testing.T) {
}
func TestRandomWatchDeliver(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
@ -569,7 +571,7 @@ func TestRandomWatchDeliver(t *testing.T) {
if !ok {
break
}
if a, e := event.Object.(*api.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a {
if a, e := event.Object.(*example.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a {
t.Errorf("Unexpected object watched: %s, expected %s", a, e)
}
watched++

View File

@ -0,0 +1,32 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tests
import (
"k8s.io/apiserver/pkg/apis/example"
_ "k8s.io/client-go/pkg/api/install"
)
func DeepEqualSafePodSpec() example.PodSpec {
grace := int64(30)
return example.PodSpec{
RestartPolicy: "Always",
TerminationGracePeriodSeconds: &grace,
SchedulerName: "default-scheduler",
}
}

View File

@ -21,6 +21,7 @@ import (
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
@ -34,13 +35,14 @@ type EtcdOptions struct {
EtcdServersOverrides []string
}
func NewEtcdOptions() *EtcdOptions {
func NewEtcdOptions(scheme *runtime.Scheme) *EtcdOptions {
return &EtcdOptions{
StorageConfig: storagebackend.Config{
Prefix: DefaultEtcdPathPrefix,
// Default cache size to 0 - if unset, its size will be set based on target
// memory usage.
DeserializationCacheSize: 0,
Copier: scheme,
},
}
}

View File

@ -44,4 +44,5 @@ type Config struct {
DeserializationCacheSize int
Codec runtime.Codec
Copier runtime.ObjectCopier
}

View File

@ -311,6 +311,7 @@ func NewMasterConfig() *master.Config {
// prefix code, so please don't change without ensuring
// sufficient coverage in other ways.
Prefix: uuid.New(),
Copier: api.Scheme,
}
info, _ := runtime.SerializerInfoForMediaType(api.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)