From 9fc0e1e98d791b79901ced941e52c87b70fba372 Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Wed, 24 Aug 2016 16:35:21 -0700 Subject: [PATCH] return destroy func to clean up internal resources of storage --- examples/apiserver/rest/reststorage.go | 2 +- federation/registry/cluster/etcd/etcd.go | 2 +- pkg/master/master_test.go | 19 ++- pkg/registry/certificates/etcd/etcd.go | 2 +- pkg/registry/clusterrole/etcd/etcd.go | 2 +- pkg/registry/clusterrolebinding/etcd/etcd.go | 2 +- pkg/registry/configmap/etcd/etcd.go | 2 +- pkg/registry/controller/etcd/etcd.go | 2 +- pkg/registry/daemonset/etcd/etcd.go | 2 +- pkg/registry/deployment/etcd/etcd.go | 2 +- pkg/registry/endpoint/etcd/etcd.go | 2 +- pkg/registry/event/etcd/etcd.go | 2 +- .../experimental/controller/etcd/etcd_test.go | 19 ++- .../generic/registry/storage_factory.go | 12 +- pkg/registry/generic/registry/store_test.go | 111 ++++++++++++------ pkg/registry/generic/storage_decorator.go | 10 +- .../horizontalpodautoscaler/etcd/etcd.go | 2 +- pkg/registry/ingress/etcd/etcd.go | 2 +- pkg/registry/job/etcd/etcd.go | 2 +- pkg/registry/limitrange/etcd/etcd.go | 2 +- pkg/registry/namespace/etcd/etcd.go | 2 +- pkg/registry/networkpolicy/etcd/etcd.go | 2 +- pkg/registry/node/etcd/etcd.go | 2 +- pkg/registry/persistentvolume/etcd/etcd.go | 2 +- .../persistentvolumeclaim/etcd/etcd.go | 2 +- pkg/registry/petset/etcd/etcd.go | 2 +- pkg/registry/pod/etcd/etcd.go | 2 +- pkg/registry/pod/rest/log_test.go | 4 +- pkg/registry/poddisruptionbudget/etcd/etcd.go | 2 +- pkg/registry/podsecuritypolicy/etcd/etcd.go | 2 +- pkg/registry/podtemplate/etcd/etcd.go | 2 +- pkg/registry/replicaset/etcd/etcd.go | 2 +- pkg/registry/resourcequota/etcd/etcd.go | 2 +- pkg/registry/role/etcd/etcd.go | 2 +- pkg/registry/rolebinding/etcd/etcd.go | 2 +- pkg/registry/scheduledjob/etcd/etcd.go | 2 +- pkg/registry/secret/etcd/etcd.go | 2 +- pkg/registry/service/allocator/etcd/etcd.go | 2 +- pkg/registry/service/etcd/etcd.go | 2 +- .../service/ipallocator/etcd/etcd_test.go | 27 +++-- pkg/registry/serviceaccount/etcd/etcd.go | 2 +- pkg/registry/storageclass/etcd/etcd.go | 2 +- pkg/registry/thirdpartyresource/etcd/etcd.go | 2 +- .../thirdpartyresourcedata/etcd/etcd.go | 2 +- pkg/storage/storagebackend/factory/etcd2.go | 9 +- pkg/storage/storagebackend/factory/etcd3.go | 15 ++- pkg/storage/storagebackend/factory/factory.go | 4 +- .../storagebackend/factory/tls_test.go | 3 +- 48 files changed, 194 insertions(+), 113 deletions(-) diff --git a/examples/apiserver/rest/reststorage.go b/examples/apiserver/rest/reststorage.go index 68a54582ee..1a6d47e4fc 100644 --- a/examples/apiserver/rest/reststorage.go +++ b/examples/apiserver/rest/reststorage.go @@ -40,7 +40,7 @@ func NewREST(config *storagebackend.Config, storageDecorator generic.StorageDeco newListFunc := func() runtime.Object { return &testgroup.TestTypeList{} } // Usually you should reuse your RESTCreateStrategy. strategy := &NotNamespaceScoped{} - storageInterface := storageDecorator( + storageInterface, _ := storageDecorator( config, 100, &testgroup.TestType{}, prefix, strategy, newListFunc, storage.NoTriggerPublisher) store := ®istry.Store{ NewFunc: func() runtime.Object { return &testgroup.TestType{} }, diff --git a/federation/registry/cluster/etcd/etcd.go b/federation/registry/cluster/etcd/etcd.go index f84e10b805..2e699511d5 100644 --- a/federation/registry/cluster/etcd/etcd.go +++ b/federation/registry/cluster/etcd/etcd.go @@ -49,7 +49,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &federation.ClusterList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, 100, &federation.Cluster{}, diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 320406830a..1699931857 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -713,8 +713,10 @@ func testInstallThirdPartyAPIListVersion(t *testing.T, version string) { }) if test.items != nil { + s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig) + defer destroyFunc() err := createThirdPartyList( - generic.NewRawStorage(master.thirdPartyStorageConfig), + s, fmt.Sprintf("/ThirdPartyResourceData/%s/%s/default", group, plural.Resource), test.items) if !assert.NoError(err, test.test) { @@ -837,7 +839,8 @@ func testInstallThirdPartyAPIGetVersion(t *testing.T, version string) { SomeField: "test field", OtherField: 10, } - s := generic.NewRawStorage(master.thirdPartyStorageConfig) + s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig) + defer destroyFunc() if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) { t.FailNow() return @@ -914,7 +917,8 @@ func testInstallThirdPartyAPIPostForVersion(t *testing.T, version string) { } thirdPartyObj := extensions.ThirdPartyResourceData{} - s := generic.NewRawStorage(master.thirdPartyStorageConfig) + s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig) + defer destroyFunc() err = s.Get(context.TODO(), etcdtest.AddPrefix("/ThirdPartyResourceData/company.com/foos/default/test"), &thirdPartyObj, false) if !assert.NoError(err) { t.FailNow() @@ -950,7 +954,8 @@ func testInstallThirdPartyAPIDeleteVersion(t *testing.T, version string) { SomeField: "test field", OtherField: 10, } - s := generic.NewRawStorage(master.thirdPartyStorageConfig) + s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig) + defer destroyFunc() if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) { t.FailNow() return @@ -1058,7 +1063,8 @@ func testInstallThirdPartyResourceRemove(t *testing.T, version string) { SomeField: "test field", OtherField: 10, } - s := generic.NewRawStorage(master.thirdPartyStorageConfig) + s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig) + defer destroyFunc() if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) { t.FailNow() return @@ -1110,11 +1116,12 @@ func testInstallThirdPartyResourceRemove(t *testing.T, version string) { } for _, key := range expectedDeletedKeys { thirdPartyObj := extensions.ThirdPartyResourceData{} - s := generic.NewRawStorage(master.thirdPartyStorageConfig) + s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig) err := s.Get(context.TODO(), key, &thirdPartyObj, false) if !storage.IsNotFound(err) { t.Errorf("expected deletion didn't happen: %v", err) } + destroyFunc() } installed := master.ListThirdPartyResources() if len(installed) != 0 { diff --git a/pkg/registry/certificates/etcd/etcd.go b/pkg/registry/certificates/etcd/etcd.go index 041430b877..2dbeb09aaa 100644 --- a/pkg/registry/certificates/etcd/etcd.go +++ b/pkg/registry/certificates/etcd/etcd.go @@ -40,7 +40,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *ApprovalREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &certificates.CertificateSigningRequestList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.CertificateSigningRequests), &certificates.CertificateSigningRequest{}, diff --git a/pkg/registry/clusterrole/etcd/etcd.go b/pkg/registry/clusterrole/etcd/etcd.go index 67a7310063..f3b4e16f4d 100644 --- a/pkg/registry/clusterrole/etcd/etcd.go +++ b/pkg/registry/clusterrole/etcd/etcd.go @@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &rbac.ClusterRoleList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.ClusterRoles), &rbac.ClusterRole{}, diff --git a/pkg/registry/clusterrolebinding/etcd/etcd.go b/pkg/registry/clusterrolebinding/etcd/etcd.go index c2040bc9d6..4d2b3cd82e 100644 --- a/pkg/registry/clusterrolebinding/etcd/etcd.go +++ b/pkg/registry/clusterrolebinding/etcd/etcd.go @@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &rbac.ClusterRoleBindingList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.ClusterRoleBindings), &rbac.ClusterRoleBinding{}, diff --git a/pkg/registry/configmap/etcd/etcd.go b/pkg/registry/configmap/etcd/etcd.go index 43a697d2d0..f3adfa9f3e 100644 --- a/pkg/registry/configmap/etcd/etcd.go +++ b/pkg/registry/configmap/etcd/etcd.go @@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.ConfigMapList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.ConfigMaps), &api.ConfigMap{}, diff --git a/pkg/registry/controller/etcd/etcd.go b/pkg/registry/controller/etcd/etcd.go index 2434e9c138..41a3b320cc 100644 --- a/pkg/registry/controller/etcd/etcd.go +++ b/pkg/registry/controller/etcd/etcd.go @@ -62,7 +62,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.ReplicationControllerList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Controllers), &api.ReplicationController{}, diff --git a/pkg/registry/daemonset/etcd/etcd.go b/pkg/registry/daemonset/etcd/etcd.go index d7e66604da..0c1d3a89e7 100644 --- a/pkg/registry/daemonset/etcd/etcd.go +++ b/pkg/registry/daemonset/etcd/etcd.go @@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &extensions.DaemonSetList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Daemonsets), &extensions.DaemonSet{}, diff --git a/pkg/registry/deployment/etcd/etcd.go b/pkg/registry/deployment/etcd/etcd.go index 2c3e52ff81..502d681a94 100644 --- a/pkg/registry/deployment/etcd/etcd.go +++ b/pkg/registry/deployment/etcd/etcd.go @@ -62,7 +62,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *RollbackREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &extensions.DeploymentList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Deployments), &extensions.Deployment{}, diff --git a/pkg/registry/endpoint/etcd/etcd.go b/pkg/registry/endpoint/etcd/etcd.go index fc6a7eb789..9baf0e7472 100644 --- a/pkg/registry/endpoint/etcd/etcd.go +++ b/pkg/registry/endpoint/etcd/etcd.go @@ -35,7 +35,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.EndpointsList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Endpoints), &api.Endpoints{}, diff --git a/pkg/registry/event/etcd/etcd.go b/pkg/registry/event/etcd/etcd.go index 2797052d4f..ee5f650639 100644 --- a/pkg/registry/event/etcd/etcd.go +++ b/pkg/registry/event/etcd/etcd.go @@ -34,7 +34,7 @@ func NewREST(opts generic.RESTOptions, ttl uint64) *REST { // We explicitly do NOT do any decoration here - switching on Cacher // for events will lead to too high memory consumption. - storageInterface := generic.NewRawStorage(opts.StorageConfig) + storageInterface, _ := generic.NewRawStorage(opts.StorageConfig) store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.Event{} }, diff --git a/pkg/registry/experimental/controller/etcd/etcd_test.go b/pkg/registry/experimental/controller/etcd/etcd_test.go index f60b87c982..a96b55f4ec 100644 --- a/pkg/registry/experimental/controller/etcd/etcd_test.go +++ b/pkg/registry/experimental/controller/etcd/etcd_test.go @@ -30,10 +30,11 @@ import ( etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" ) -func newStorage(t *testing.T) (*ScaleREST, *etcdtesting.EtcdTestServer, storage.Interface) { +func newStorage(t *testing.T) (*ScaleREST, *etcdtesting.EtcdTestServer, storage.Interface, func()) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "controllers"} - return NewStorage(restOptions).Scale, server, generic.NewRawStorage(etcdStorage) + s, destroyFunc := generic.NewRawStorage(etcdStorage) + return NewStorage(restOptions).Scale, server, s, destroyFunc } var validPodTemplate = api.PodTemplate{ @@ -82,8 +83,11 @@ var validScale = extensions.Scale{ } func TestGet(t *testing.T) { - storage, server, si := newStorage(t) - defer server.Terminate(t) + storage, server, si, destroyFunc := newStorage(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() ctx := api.WithNamespace(api.NewContext(), "test") key := etcdtest.AddPrefix("/controllers/test/foo") @@ -101,8 +105,11 @@ func TestGet(t *testing.T) { } func TestUpdate(t *testing.T) { - storage, server, si := newStorage(t) - defer server.Terminate(t) + storage, server, si, destroyFunc := newStorage(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() ctx := api.WithNamespace(api.NewContext(), "test") key := etcdtest.AddPrefix("/controllers/test/foo") diff --git a/pkg/registry/generic/registry/storage_factory.go b/pkg/registry/generic/registry/storage_factory.go index e5a1fd0237..53cb46d3bc 100644 --- a/pkg/registry/generic/registry/storage_factory.go +++ b/pkg/registry/generic/registry/storage_factory.go @@ -33,13 +33,14 @@ func StorageWithCacher( resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, newListFunc func() runtime.Object, - triggerFunc storage.TriggerPublisherFunc) storage.Interface { + triggerFunc storage.TriggerPublisherFunc) (storage.Interface, func()) { + s, d := generic.NewRawStorage(storageConfig) // TODO: we would change this later to make storage always have cacher and hide low level KV layer inside. // Currently it has two layers of same storage interface -- cacher and low level kv. cacherConfig := storage.CacherConfig{ CacheCapacity: capacity, - Storage: generic.NewRawStorage(storageConfig), + Storage: s, Versioner: etcdstorage.APIObjectVersioner{}, Type: objectType, ResourcePrefix: resourcePrefix, @@ -56,6 +57,11 @@ func StorageWithCacher( return storage.NoNamespaceKeyFunc(resourcePrefix, obj) } } + cacher := storage.NewCacherFromConfig(cacherConfig) + destroyFunc := func() { + cacher.Stop() + d() + } - return storage.NewCacherFromConfig(cacherConfig) + return cacher, destroyFunc } diff --git a/pkg/registry/generic/registry/store_test.go b/pkg/registry/generic/registry/store_test.go index 65bf2b2e7d..131b22ac09 100644 --- a/pkg/registry/generic/registry/store_test.go +++ b/pkg/registry/generic/registry/store_test.go @@ -88,7 +88,7 @@ func (t *testRESTStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Obje } func (t *testRESTStrategy) Canonicalize(obj runtime.Object) {} -func NewTestGenericStoreRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Store) { +func NewTestGenericStoreRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, func(), *Store) { return newTestGenericStoreRegistry(t, false) } @@ -173,7 +173,7 @@ func TestStoreList(t *testing.T) { if item.context != nil { ctx = item.context } - server, registry := NewTestGenericStoreRegistry(t) + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) if item.in != nil { if err := storagetesting.CreateList("/pods", registry.Storage, item.in); err != nil { @@ -191,6 +191,7 @@ func TestStoreList(t *testing.T) { if e, a := item.out, list; !api.Semantic.DeepDerivative(e, a) { t.Errorf("%v: Expected %#v, got %#v", name, e, a) } + destroyFunc() server.Terminate(t) } } @@ -208,8 +209,11 @@ func TestStoreListResourceVersion(t *testing.T) { } ctx := api.WithNamespace(api.NewContext(), "test") - server, registry := newTestGenericStoreRegistry(t, true) - defer server.Terminate(t) + server, destroyFunc, registry := newTestGenericStoreRegistry(t, true) + defer func() { + destroyFunc() + server.Terminate(t) + }() obj, err := registry.Create(ctx, fooPod) if err != nil { @@ -270,8 +274,11 @@ func TestStoreCreate(t *testing.T) { } testContext := api.WithNamespace(api.NewContext(), "test") - server, registry := NewTestGenericStoreRegistry(t) - defer server.Terminate(t) + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() // create the object objA, err := registry.Create(testContext, podA) @@ -330,8 +337,11 @@ func TestStoreUpdate(t *testing.T) { } testContext := api.WithNamespace(api.NewContext(), "test") - server, registry := NewTestGenericStoreRegistry(t) - defer server.Terminate(t) + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() // Test1 try to update a non-existing node _, _, err := registry.Update(testContext, podA.Name, rest.DefaultUpdatedObjectInfo(podA, api.Scheme)) @@ -368,8 +378,11 @@ func TestStoreUpdate(t *testing.T) { } func TestNoOpUpdates(t *testing.T) { - server, registry := NewTestGenericStoreRegistry(t) - defer server.Terminate(t) + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() newPod := func() *api.Pod { return &api.Pod{ @@ -450,8 +463,11 @@ func TestStoreCustomExport(t *testing.T) { Spec: api.PodSpec{NodeName: "machine"}, } - server, registry := NewTestGenericStoreRegistry(t) - defer server.Terminate(t) + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() registry.ExportStrategy = testPodExport{} @@ -496,8 +512,11 @@ func TestStoreBasicExport(t *testing.T) { Status: api.PodStatus{HostIP: "1.2.3.4"}, } - server, registry := NewTestGenericStoreRegistry(t) - defer server.Terminate(t) + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() testContext := api.WithNamespace(api.NewContext(), "test") registry.UpdateStrategy.(*testRESTStrategy).allowCreateOnUpdate = true @@ -528,8 +547,11 @@ func TestStoreGet(t *testing.T) { } testContext := api.WithNamespace(api.NewContext(), "test") - server, registry := NewTestGenericStoreRegistry(t) - defer server.Terminate(t) + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() _, err := registry.Get(testContext, podA.Name) if !errors.IsNotFound(err) { @@ -549,8 +571,11 @@ func TestStoreDelete(t *testing.T) { } testContext := api.WithNamespace(api.NewContext(), "test") - server, registry := NewTestGenericStoreRegistry(t) - defer server.Terminate(t) + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() // test failure condition _, err := registry.Delete(testContext, podA.Name, nil) @@ -587,9 +612,11 @@ func TestStoreHandleFinalizers(t *testing.T) { } testContext := api.WithNamespace(api.NewContext(), "test") - server, registry := NewTestGenericStoreRegistry(t) - defer server.Terminate(t) - + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() // create pod _, err := registry.Create(testContext, podWithFinalizer) if err != nil { @@ -884,8 +911,11 @@ func TestStoreDeleteWithOrphanDependents(t *testing.T) { } testContext := api.WithNamespace(api.NewContext(), "test") - server, registry := NewTestGenericStoreRegistry(t) - defer server.Terminate(t) + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() for _, tc := range testcases { registry.DeleteStrategy = tc.strategy @@ -931,8 +961,11 @@ func TestStoreDeleteCollection(t *testing.T) { podB := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}} testContext := api.WithNamespace(api.NewContext(), "test") - server, registry := NewTestGenericStoreRegistry(t) - defer server.Terminate(t) + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() if _, err := registry.Create(testContext, podA); err != nil { t.Errorf("Unexpected error: %v", err) @@ -960,8 +993,11 @@ func TestStoreDeleteCollection(t *testing.T) { } func TestStoreDeleteCollectionNotFound(t *testing.T) { - server, registry := NewTestGenericStoreRegistry(t) - defer server.Terminate(t) + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() testContext := api.WithNamespace(api.NewContext(), "test") @@ -1006,8 +1042,11 @@ func TestStoreDeleteCollectionWithWatch(t *testing.T) { podA := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} testContext := api.WithNamespace(api.NewContext(), "test") - server, registry := NewTestGenericStoreRegistry(t) - defer server.Terminate(t) + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() objCreated, err := registry.Create(testContext, podA) if err != nil { @@ -1073,7 +1112,7 @@ func TestStoreWatch(t *testing.T) { Spec: api.PodSpec{NodeName: "machine"}, } - server, registry := NewTestGenericStoreRegistry(t) + server, destroyFunc, registry := NewTestGenericStoreRegistry(t) wi, err := registry.WatchPredicate(ctx, m.selectPred, "0") if err != nil { t.Errorf("%v: unexpected error: %v", name, err) @@ -1091,17 +1130,19 @@ func TestStoreWatch(t *testing.T) { } wi.Stop() } - + destroyFunc() server.Terminate(t) } } -func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (*etcdtesting.EtcdTestServer, *Store) { +func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (*etcdtesting.EtcdTestServer, func(), *Store) { podPrefix := "/pods" server := etcdtesting.NewEtcdTestClientServer(t) strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true} + codec := testapi.Default.StorageCodec() s := etcdstorage.NewEtcdStorage(server.Client, codec, etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize) + destroyFunc := func() {} if hasCacheEnabled { config := storage.CacherConfig{ CacheCapacity: 10, @@ -1113,10 +1154,12 @@ func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (*etcdtesti NewListFunc: func() runtime.Object { return &api.PodList{} }, Codec: codec, } - s = storage.NewCacherFromConfig(config) + cacher := storage.NewCacherFromConfig(config) + s = cacher + destroyFunc = cacher.Stop } - return server, &Store{ + return server, destroyFunc, &Store{ NewFunc: func() runtime.Object { return &api.Pod{} }, NewListFunc: func() runtime.Object { return &api.PodList{} }, QualifiedResource: api.Resource("pods"), diff --git a/pkg/registry/generic/storage_decorator.go b/pkg/registry/generic/storage_decorator.go index 5d6f48ce30..9012c49520 100644 --- a/pkg/registry/generic/storage_decorator.go +++ b/pkg/registry/generic/storage_decorator.go @@ -34,7 +34,7 @@ type StorageDecorator func( resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, newListFunc func() runtime.Object, - trigger storage.TriggerPublisherFunc) storage.Interface + trigger storage.TriggerPublisherFunc) (storage.Interface, func()) // Returns given 'storageInterface' without any decoration. func UndecoratedStorage( @@ -44,17 +44,17 @@ func UndecoratedStorage( resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, newListFunc func() runtime.Object, - trigger storage.TriggerPublisherFunc) storage.Interface { + trigger storage.TriggerPublisherFunc) (storage.Interface, func()) { return NewRawStorage(config) } // NewRawStorage creates the low level kv storage. This is a work-around for current // two layer of same storage interface. // TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method. -func NewRawStorage(config *storagebackend.Config) storage.Interface { - s, err := factory.Create(*config) +func NewRawStorage(config *storagebackend.Config) (storage.Interface, func()) { + s, d, err := factory.Create(*config) if err != nil { glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err) } - return s + return s, d } diff --git a/pkg/registry/horizontalpodautoscaler/etcd/etcd.go b/pkg/registry/horizontalpodautoscaler/etcd/etcd.go index cfd410043f..b9b238bdc1 100644 --- a/pkg/registry/horizontalpodautoscaler/etcd/etcd.go +++ b/pkg/registry/horizontalpodautoscaler/etcd/etcd.go @@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &autoscaling.HorizontalPodAutoscalerList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.HorizontalPodAutoscalers), &autoscaling.HorizontalPodAutoscaler{}, diff --git a/pkg/registry/ingress/etcd/etcd.go b/pkg/registry/ingress/etcd/etcd.go index 657a529114..afa05ca0d4 100644 --- a/pkg/registry/ingress/etcd/etcd.go +++ b/pkg/registry/ingress/etcd/etcd.go @@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &extensions.IngressList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Ingress), &extensions.Ingress{}, diff --git a/pkg/registry/job/etcd/etcd.go b/pkg/registry/job/etcd/etcd.go index 42aca024c2..fe7f300b62 100644 --- a/pkg/registry/job/etcd/etcd.go +++ b/pkg/registry/job/etcd/etcd.go @@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &batch.JobList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Jobs), &batch.Job{}, diff --git a/pkg/registry/limitrange/etcd/etcd.go b/pkg/registry/limitrange/etcd/etcd.go index b2a8ed2e44..b39343e6c0 100644 --- a/pkg/registry/limitrange/etcd/etcd.go +++ b/pkg/registry/limitrange/etcd/etcd.go @@ -35,7 +35,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.LimitRangeList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.LimitRanges), &api.LimitRange{}, diff --git a/pkg/registry/namespace/etcd/etcd.go b/pkg/registry/namespace/etcd/etcd.go index ea2a81e432..3adc1eca3d 100644 --- a/pkg/registry/namespace/etcd/etcd.go +++ b/pkg/registry/namespace/etcd/etcd.go @@ -53,7 +53,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *FinalizeREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.NamespaceList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Namespaces), &api.Namespace{}, diff --git a/pkg/registry/networkpolicy/etcd/etcd.go b/pkg/registry/networkpolicy/etcd/etcd.go index 9bcec95672..135e6eee20 100644 --- a/pkg/registry/networkpolicy/etcd/etcd.go +++ b/pkg/registry/networkpolicy/etcd/etcd.go @@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &extensionsapi.NetworkPolicyList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.NetworkPolicys), &extensionsapi.NetworkPolicy{}, diff --git a/pkg/registry/node/etcd/etcd.go b/pkg/registry/node/etcd/etcd.go index d42c944c97..4f215f2e2a 100644 --- a/pkg/registry/node/etcd/etcd.go +++ b/pkg/registry/node/etcd/etcd.go @@ -69,7 +69,7 @@ func NewStorage(opts generic.RESTOptions, connection client.ConnectionInfoGetter prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.NodeList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Nodes), &api.Node{}, diff --git a/pkg/registry/persistentvolume/etcd/etcd.go b/pkg/registry/persistentvolume/etcd/etcd.go index 5e14c65aad..63fd220c2d 100644 --- a/pkg/registry/persistentvolume/etcd/etcd.go +++ b/pkg/registry/persistentvolume/etcd/etcd.go @@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.PersistentVolumeList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumes), &api.PersistentVolume{}, diff --git a/pkg/registry/persistentvolumeclaim/etcd/etcd.go b/pkg/registry/persistentvolumeclaim/etcd/etcd.go index d0fda8bb2d..8637bd5a4f 100644 --- a/pkg/registry/persistentvolumeclaim/etcd/etcd.go +++ b/pkg/registry/persistentvolumeclaim/etcd/etcd.go @@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.PersistentVolumeClaimList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumeClaims), &api.PersistentVolumeClaim{}, diff --git a/pkg/registry/petset/etcd/etcd.go b/pkg/registry/petset/etcd/etcd.go index 965cb5526f..d256414cff 100644 --- a/pkg/registry/petset/etcd/etcd.go +++ b/pkg/registry/petset/etcd/etcd.go @@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &appsapi.PetSetList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.PetSet), &appsapi.PetSet{}, diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index ca5368d676..65203a3e19 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -63,7 +63,7 @@ func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTr prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.PodList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Pods), &api.Pod{}, diff --git a/pkg/registry/pod/rest/log_test.go b/pkg/registry/pod/rest/log_test.go index 2e0d33c19f..b57f54641a 100644 --- a/pkg/registry/pod/rest/log_test.go +++ b/pkg/registry/pod/rest/log_test.go @@ -28,8 +28,10 @@ import ( func TestPodLogValidates(t *testing.T) { etcdStorage, _ := registrytest.NewEtcdStorage(t, "") + s, destroyFunc := generic.NewRawStorage(etcdStorage) + defer destroyFunc() store := ®istry.Store{ - Storage: generic.NewRawStorage(etcdStorage), + Storage: s, } logRest := &LogREST{Store: store, KubeletConn: nil} diff --git a/pkg/registry/poddisruptionbudget/etcd/etcd.go b/pkg/registry/poddisruptionbudget/etcd/etcd.go index da9f3eb251..9a5eef5a5e 100644 --- a/pkg/registry/poddisruptionbudget/etcd/etcd.go +++ b/pkg/registry/poddisruptionbudget/etcd/etcd.go @@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &policyapi.PodDisruptionBudgetList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.PodDisruptionBudget), &policyapi.PodDisruptionBudget{}, diff --git a/pkg/registry/podsecuritypolicy/etcd/etcd.go b/pkg/registry/podsecuritypolicy/etcd/etcd.go index 8479c9f505..b5991f84ba 100644 --- a/pkg/registry/podsecuritypolicy/etcd/etcd.go +++ b/pkg/registry/podsecuritypolicy/etcd/etcd.go @@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &extensions.PodSecurityPolicyList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.PodSecurityPolicies), &extensions.PodSecurityPolicy{}, diff --git a/pkg/registry/podtemplate/etcd/etcd.go b/pkg/registry/podtemplate/etcd/etcd.go index 21448f5596..0394c94016 100644 --- a/pkg/registry/podtemplate/etcd/etcd.go +++ b/pkg/registry/podtemplate/etcd/etcd.go @@ -35,7 +35,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.PodTemplateList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.PodTemplates), &api.PodTemplate{}, diff --git a/pkg/registry/replicaset/etcd/etcd.go b/pkg/registry/replicaset/etcd/etcd.go index 0ffb28b3b3..9bed4db74a 100644 --- a/pkg/registry/replicaset/etcd/etcd.go +++ b/pkg/registry/replicaset/etcd/etcd.go @@ -61,7 +61,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &extensions.ReplicaSetList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Replicasets), &extensions.ReplicaSet{}, diff --git a/pkg/registry/resourcequota/etcd/etcd.go b/pkg/registry/resourcequota/etcd/etcd.go index f5c10ba3c1..404eddc0e1 100644 --- a/pkg/registry/resourcequota/etcd/etcd.go +++ b/pkg/registry/resourcequota/etcd/etcd.go @@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.ResourceQuotaList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.ResourceQuotas), &api.ResourceQuota{}, diff --git a/pkg/registry/role/etcd/etcd.go b/pkg/registry/role/etcd/etcd.go index 9d67b8eb8d..4f6c5235da 100644 --- a/pkg/registry/role/etcd/etcd.go +++ b/pkg/registry/role/etcd/etcd.go @@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &rbac.RoleList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Roles), &rbac.Role{}, diff --git a/pkg/registry/rolebinding/etcd/etcd.go b/pkg/registry/rolebinding/etcd/etcd.go index ddd3e48837..af43741fc3 100644 --- a/pkg/registry/rolebinding/etcd/etcd.go +++ b/pkg/registry/rolebinding/etcd/etcd.go @@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &rbac.RoleBindingList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.RoleBindings), &rbac.RoleBinding{}, diff --git a/pkg/registry/scheduledjob/etcd/etcd.go b/pkg/registry/scheduledjob/etcd/etcd.go index ec973214c4..5e55afcaca 100644 --- a/pkg/registry/scheduledjob/etcd/etcd.go +++ b/pkg/registry/scheduledjob/etcd/etcd.go @@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &batch.ScheduledJobList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.ScheduledJobs), &batch.ScheduledJob{}, diff --git a/pkg/registry/secret/etcd/etcd.go b/pkg/registry/secret/etcd/etcd.go index 398335ce66..30a759aa96 100644 --- a/pkg/registry/secret/etcd/etcd.go +++ b/pkg/registry/secret/etcd/etcd.go @@ -35,7 +35,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.SecretList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Secrets), &api.Secret{}, diff --git a/pkg/registry/service/allocator/etcd/etcd.go b/pkg/registry/service/allocator/etcd/etcd.go index 7c67df6acd..dc8c56a076 100644 --- a/pkg/registry/service/allocator/etcd/etcd.go +++ b/pkg/registry/service/allocator/etcd/etcd.go @@ -61,7 +61,7 @@ var _ rangeallocation.RangeRegistry = &Etcd{} // NewEtcd returns an allocator that is backed by Etcd and can manage // persisting the snapshot state of allocation after each allocation is made. func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource unversioned.GroupResource, config *storagebackend.Config) *Etcd { - storage := generic.NewRawStorage(config) + storage, _ := generic.NewRawStorage(config) return &Etcd{ alloc: alloc, storage: storage, diff --git a/pkg/registry/service/etcd/etcd.go b/pkg/registry/service/etcd/etcd.go index fc09958512..9c137b96ff 100644 --- a/pkg/registry/service/etcd/etcd.go +++ b/pkg/registry/service/etcd/etcd.go @@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.ServiceList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.Services), &api.Service{}, diff --git a/pkg/registry/service/ipallocator/etcd/etcd_test.go b/pkg/registry/service/ipallocator/etcd/etcd_test.go index 881536ffba..d309c4cc05 100644 --- a/pkg/registry/service/ipallocator/etcd/etcd_test.go +++ b/pkg/registry/service/ipallocator/etcd/etcd_test.go @@ -34,7 +34,7 @@ import ( "golang.org/x/net/context" ) -func newStorage(t *testing.T) (*etcdtesting.EtcdTestServer, ipallocator.Interface, allocator.Interface, storage.Interface) { +func newStorage(t *testing.T) (*etcdtesting.EtcdTestServer, ipallocator.Interface, allocator.Interface, storage.Interface, func()) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") _, cidr, err := net.ParseCIDR("192.168.1.0/24") if err != nil { @@ -48,8 +48,8 @@ func newStorage(t *testing.T) (*etcdtesting.EtcdTestServer, ipallocator.Interfac etcd := allocatoretcd.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), etcdStorage) return etcd }) - - return server, storage, backing, generic.NewRawStorage(etcdStorage) + s, destroyFunc := generic.NewRawStorage(etcdStorage) + return server, storage, backing, s, destroyFunc } func validNewRangeAllocation() *api.RangeAllocation { @@ -65,24 +65,33 @@ func key() string { } func TestEmpty(t *testing.T) { - server, storage, _, _ := newStorage(t) - defer server.Terminate(t) + server, storage, _, _, destroyFunc := newStorage(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() if err := storage.Allocate(net.ParseIP("192.168.1.2")); !strings.Contains(err.Error(), "cannot allocate resources of type serviceipallocations at this time") { t.Fatal(err) } } func TestErrors(t *testing.T) { - server, storage, _, _ := newStorage(t) - defer server.Terminate(t) + server, storage, _, _, destroyFunc := newStorage(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() if err := storage.Allocate(net.ParseIP("192.168.0.0")); err != ipallocator.ErrNotInRange { t.Fatal(err) } } func TestStore(t *testing.T) { - server, storage, backing, si := newStorage(t) - defer server.Terminate(t) + server, storage, backing, si, destroyFunc := newStorage(t) + defer func() { + destroyFunc() + server.Terminate(t) + }() if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/registry/serviceaccount/etcd/etcd.go b/pkg/registry/serviceaccount/etcd/etcd.go index 0ed3b7c636..22dc642441 100644 --- a/pkg/registry/serviceaccount/etcd/etcd.go +++ b/pkg/registry/serviceaccount/etcd/etcd.go @@ -35,7 +35,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.ServiceAccountList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.ServiceAccounts), &api.ServiceAccount{}, diff --git a/pkg/registry/storageclass/etcd/etcd.go b/pkg/registry/storageclass/etcd/etcd.go index 098375e8ae..b8467d566c 100644 --- a/pkg/registry/storageclass/etcd/etcd.go +++ b/pkg/registry/storageclass/etcd/etcd.go @@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &extensions.StorageClassList{} } - storageInterface := opts.Decorator( + storageInterface, _ := opts.Decorator( opts.StorageConfig, cachesize.GetWatchCacheSizeByResource(cachesize.StorageClasses), &extensions.StorageClass{}, diff --git a/pkg/registry/thirdpartyresource/etcd/etcd.go b/pkg/registry/thirdpartyresource/etcd/etcd.go index 6fbc45a2e4..8da3c6b32f 100644 --- a/pkg/registry/thirdpartyresource/etcd/etcd.go +++ b/pkg/registry/thirdpartyresource/etcd/etcd.go @@ -35,7 +35,7 @@ func NewREST(opts generic.RESTOptions) *REST { prefix := "/" + opts.ResourcePrefix // We explicitly do NOT do any decoration here yet. - storageInterface := generic.NewRawStorage(opts.StorageConfig) + storageInterface, _ := generic.NewRawStorage(opts.StorageConfig) store := ®istry.Store{ NewFunc: func() runtime.Object { return &extensions.ThirdPartyResource{} }, diff --git a/pkg/registry/thirdpartyresourcedata/etcd/etcd.go b/pkg/registry/thirdpartyresourcedata/etcd/etcd.go index 9d88664c89..768411dea8 100644 --- a/pkg/registry/thirdpartyresourcedata/etcd/etcd.go +++ b/pkg/registry/thirdpartyresourcedata/etcd/etcd.go @@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions, group, kind string) *REST { prefix := "/ThirdPartyResourceData/" + group + "/" + strings.ToLower(kind) + "s" // We explicitly do NOT do any decoration here yet. - storageInterface := generic.NewRawStorage(opts.StorageConfig) + storageInterface, _ := generic.NewRawStorage(opts.StorageConfig) store := ®istry.Store{ NewFunc: func() runtime.Object { return &extensions.ThirdPartyResourceData{} }, diff --git a/pkg/storage/storagebackend/factory/etcd2.go b/pkg/storage/storagebackend/factory/etcd2.go index 9452761a17..01a7ec4f48 100644 --- a/pkg/storage/storagebackend/factory/etcd2.go +++ b/pkg/storage/storagebackend/factory/etcd2.go @@ -30,16 +30,17 @@ import ( utilnet "k8s.io/kubernetes/pkg/util/net" ) -func newETCD2Storage(c storagebackend.Config) (storage.Interface, error) { +func newETCD2Storage(c storagebackend.Config) (storage.Interface, func(), error) { tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile) if err != nil { - return nil, err + return nil, nil, err } client, err := newETCD2Client(tr, c.ServerList) if err != nil { - return nil, err + return nil, nil, err } - return etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize), nil + s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize) + return s, tr.CloseIdleConnections, nil } func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) { diff --git a/pkg/storage/storagebackend/factory/etcd3.go b/pkg/storage/storagebackend/factory/etcd3.go index 9fea1785f5..eca7311212 100644 --- a/pkg/storage/storagebackend/factory/etcd3.go +++ b/pkg/storage/storagebackend/factory/etcd3.go @@ -26,7 +26,7 @@ import ( "golang.org/x/net/context" ) -func newETCD3Storage(c storagebackend.Config) (storage.Interface, error) { +func newETCD3Storage(c storagebackend.Config) (storage.Interface, func(), error) { tlsInfo := transport.TLSInfo{ CertFile: c.CertFile, KeyFile: c.KeyFile, @@ -34,7 +34,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, error) { } tlsConfig, err := tlsInfo.ClientConfig() if err != nil { - return nil, err + return nil, nil, err } cfg := clientv3.Config{ @@ -43,8 +43,13 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, error) { } client, err := clientv3.New(cfg) if err != nil { - return nil, err + return nil, nil, err } - etcd3.StartCompactor(context.Background(), client) - return etcd3.New(client, c.Codec, c.Prefix), nil + ctx, cancel := context.WithCancel(context.Background()) + etcd3.StartCompactor(ctx, client) + destroyFunc := func() { + cancel() + client.Close() + } + return etcd3.New(client, c.Codec, c.Prefix), destroyFunc, nil } diff --git a/pkg/storage/storagebackend/factory/factory.go b/pkg/storage/storagebackend/factory/factory.go index de9d7734b2..d7ee038128 100644 --- a/pkg/storage/storagebackend/factory/factory.go +++ b/pkg/storage/storagebackend/factory/factory.go @@ -24,7 +24,7 @@ import ( ) // Create creates a storage backend based on given config. -func Create(c storagebackend.Config) (storage.Interface, error) { +func Create(c storagebackend.Config) (storage.Interface, func(), error) { switch c.Type { case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2: return newETCD2Storage(c) @@ -35,6 +35,6 @@ func Create(c storagebackend.Config) (storage.Interface, error) { // - Support non-quorum read. return newETCD3Storage(c) default: - return nil, fmt.Errorf("unknown storage type: %s", c.Type) + return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type) } } diff --git a/pkg/storage/storagebackend/factory/tls_test.go b/pkg/storage/storagebackend/factory/tls_test.go index a2ad0ae038..448cf16f37 100644 --- a/pkg/storage/storagebackend/factory/tls_test.go +++ b/pkg/storage/storagebackend/factory/tls_test.go @@ -56,7 +56,8 @@ func TestTLSConnection(t *testing.T) { CAFile: caFile, Codec: testapi.Default.Codec(), } - storage, err := newETCD3Storage(cfg) + storage, destroyFunc, err := newETCD3Storage(cfg) + defer destroyFunc() if err != nil { t.Fatal(err) }