Merge pull request #31390 from hongchaodeng/fix

Automatic merge from submit-queue

return destroy func to clean up internal resources of storage

What?
Provide a destroy func to clean up internal resources of storage.
It changes **unit tests** to clean up resources. (Maybe fix integration test in another PR.)

Why?
Although apiserver is designed to be long running, there are some cases that it's not.
See https://github.com/kubernetes/kubernetes/issues/31262#issuecomment-242208771
We need to gracefully shutdown and clean up resources.
pull/6/head
Kubernetes Submit Queue 2016-08-25 16:44:10 -07:00 committed by GitHub
commit 40efde7e0a
48 changed files with 194 additions and 113 deletions

View File

@ -40,7 +40,7 @@ func NewREST(config *storagebackend.Config, storageDecorator generic.StorageDeco
newListFunc := func() runtime.Object { return &testgroup.TestTypeList{} } newListFunc := func() runtime.Object { return &testgroup.TestTypeList{} }
// Usually you should reuse your RESTCreateStrategy. // Usually you should reuse your RESTCreateStrategy.
strategy := &NotNamespaceScoped{} strategy := &NotNamespaceScoped{}
storageInterface := storageDecorator( storageInterface, _ := storageDecorator(
config, 100, &testgroup.TestType{}, prefix, strategy, newListFunc, storage.NoTriggerPublisher) config, 100, &testgroup.TestType{}, prefix, strategy, newListFunc, storage.NoTriggerPublisher)
store := &registry.Store{ store := &registry.Store{
NewFunc: func() runtime.Object { return &testgroup.TestType{} }, NewFunc: func() runtime.Object { return &testgroup.TestType{} },

View File

@ -49,7 +49,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &federation.ClusterList{} } newListFunc := func() runtime.Object { return &federation.ClusterList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
100, 100,
&federation.Cluster{}, &federation.Cluster{},

View File

@ -713,8 +713,10 @@ func testInstallThirdPartyAPIListVersion(t *testing.T, version string) {
}) })
if test.items != nil { if test.items != nil {
s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig)
defer destroyFunc()
err := createThirdPartyList( err := createThirdPartyList(
generic.NewRawStorage(master.thirdPartyStorageConfig), s,
fmt.Sprintf("/ThirdPartyResourceData/%s/%s/default", group, plural.Resource), fmt.Sprintf("/ThirdPartyResourceData/%s/%s/default", group, plural.Resource),
test.items) test.items)
if !assert.NoError(err, test.test) { if !assert.NoError(err, test.test) {
@ -837,7 +839,8 @@ func testInstallThirdPartyAPIGetVersion(t *testing.T, version string) {
SomeField: "test field", SomeField: "test field",
OtherField: 10, OtherField: 10,
} }
s := generic.NewRawStorage(master.thirdPartyStorageConfig) s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig)
defer destroyFunc()
if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) { if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) {
t.FailNow() t.FailNow()
return return
@ -914,7 +917,8 @@ func testInstallThirdPartyAPIPostForVersion(t *testing.T, version string) {
} }
thirdPartyObj := extensions.ThirdPartyResourceData{} thirdPartyObj := extensions.ThirdPartyResourceData{}
s := generic.NewRawStorage(master.thirdPartyStorageConfig) s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig)
defer destroyFunc()
err = s.Get(context.TODO(), etcdtest.AddPrefix("/ThirdPartyResourceData/company.com/foos/default/test"), &thirdPartyObj, false) err = s.Get(context.TODO(), etcdtest.AddPrefix("/ThirdPartyResourceData/company.com/foos/default/test"), &thirdPartyObj, false)
if !assert.NoError(err) { if !assert.NoError(err) {
t.FailNow() t.FailNow()
@ -950,7 +954,8 @@ func testInstallThirdPartyAPIDeleteVersion(t *testing.T, version string) {
SomeField: "test field", SomeField: "test field",
OtherField: 10, OtherField: 10,
} }
s := generic.NewRawStorage(master.thirdPartyStorageConfig) s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig)
defer destroyFunc()
if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) { if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) {
t.FailNow() t.FailNow()
return return
@ -1058,7 +1063,8 @@ func testInstallThirdPartyResourceRemove(t *testing.T, version string) {
SomeField: "test field", SomeField: "test field",
OtherField: 10, OtherField: 10,
} }
s := generic.NewRawStorage(master.thirdPartyStorageConfig) s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig)
defer destroyFunc()
if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) { if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) {
t.FailNow() t.FailNow()
return return
@ -1110,11 +1116,12 @@ func testInstallThirdPartyResourceRemove(t *testing.T, version string) {
} }
for _, key := range expectedDeletedKeys { for _, key := range expectedDeletedKeys {
thirdPartyObj := extensions.ThirdPartyResourceData{} thirdPartyObj := extensions.ThirdPartyResourceData{}
s := generic.NewRawStorage(master.thirdPartyStorageConfig) s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig)
err := s.Get(context.TODO(), key, &thirdPartyObj, false) err := s.Get(context.TODO(), key, &thirdPartyObj, false)
if !storage.IsNotFound(err) { if !storage.IsNotFound(err) {
t.Errorf("expected deletion didn't happen: %v", err) t.Errorf("expected deletion didn't happen: %v", err)
} }
destroyFunc()
} }
installed := master.ListThirdPartyResources() installed := master.ListThirdPartyResources()
if len(installed) != 0 { if len(installed) != 0 {

View File

@ -40,7 +40,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *ApprovalREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &certificates.CertificateSigningRequestList{} } newListFunc := func() runtime.Object { return &certificates.CertificateSigningRequestList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.CertificateSigningRequests), cachesize.GetWatchCacheSizeByResource(cachesize.CertificateSigningRequests),
&certificates.CertificateSigningRequest{}, &certificates.CertificateSigningRequest{},

View File

@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &rbac.ClusterRoleList{} } newListFunc := func() runtime.Object { return &rbac.ClusterRoleList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.ClusterRoles), cachesize.GetWatchCacheSizeByResource(cachesize.ClusterRoles),
&rbac.ClusterRole{}, &rbac.ClusterRole{},

View File

@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &rbac.ClusterRoleBindingList{} } newListFunc := func() runtime.Object { return &rbac.ClusterRoleBindingList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.ClusterRoleBindings), cachesize.GetWatchCacheSizeByResource(cachesize.ClusterRoleBindings),
&rbac.ClusterRoleBinding{}, &rbac.ClusterRoleBinding{},

View File

@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.ConfigMapList{} } newListFunc := func() runtime.Object { return &api.ConfigMapList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.ConfigMaps), cachesize.GetWatchCacheSizeByResource(cachesize.ConfigMaps),
&api.ConfigMap{}, &api.ConfigMap{},

View File

@ -62,7 +62,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.ReplicationControllerList{} } newListFunc := func() runtime.Object { return &api.ReplicationControllerList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Controllers), cachesize.GetWatchCacheSizeByResource(cachesize.Controllers),
&api.ReplicationController{}, &api.ReplicationController{},

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &extensions.DaemonSetList{} } newListFunc := func() runtime.Object { return &extensions.DaemonSetList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Daemonsets), cachesize.GetWatchCacheSizeByResource(cachesize.Daemonsets),
&extensions.DaemonSet{}, &extensions.DaemonSet{},

View File

@ -62,7 +62,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *RollbackREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &extensions.DeploymentList{} } newListFunc := func() runtime.Object { return &extensions.DeploymentList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Deployments), cachesize.GetWatchCacheSizeByResource(cachesize.Deployments),
&extensions.Deployment{}, &extensions.Deployment{},

View File

@ -35,7 +35,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.EndpointsList{} } newListFunc := func() runtime.Object { return &api.EndpointsList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Endpoints), cachesize.GetWatchCacheSizeByResource(cachesize.Endpoints),
&api.Endpoints{}, &api.Endpoints{},

View File

@ -34,7 +34,7 @@ func NewREST(opts generic.RESTOptions, ttl uint64) *REST {
// We explicitly do NOT do any decoration here - switching on Cacher // We explicitly do NOT do any decoration here - switching on Cacher
// for events will lead to too high memory consumption. // for events will lead to too high memory consumption.
storageInterface := generic.NewRawStorage(opts.StorageConfig) storageInterface, _ := generic.NewRawStorage(opts.StorageConfig)
store := &registry.Store{ store := &registry.Store{
NewFunc: func() runtime.Object { return &api.Event{} }, NewFunc: func() runtime.Object { return &api.Event{} },

View File

@ -30,10 +30,11 @@ import (
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
) )
func newStorage(t *testing.T) (*ScaleREST, *etcdtesting.EtcdTestServer, storage.Interface) { func newStorage(t *testing.T) (*ScaleREST, *etcdtesting.EtcdTestServer, storage.Interface, func()) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "") etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "controllers"} restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "controllers"}
return NewStorage(restOptions).Scale, server, generic.NewRawStorage(etcdStorage) s, destroyFunc := generic.NewRawStorage(etcdStorage)
return NewStorage(restOptions).Scale, server, s, destroyFunc
} }
var validPodTemplate = api.PodTemplate{ var validPodTemplate = api.PodTemplate{
@ -82,8 +83,11 @@ var validScale = extensions.Scale{
} }
func TestGet(t *testing.T) { func TestGet(t *testing.T) {
storage, server, si := newStorage(t) storage, server, si, destroyFunc := newStorage(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
ctx := api.WithNamespace(api.NewContext(), "test") ctx := api.WithNamespace(api.NewContext(), "test")
key := etcdtest.AddPrefix("/controllers/test/foo") key := etcdtest.AddPrefix("/controllers/test/foo")
@ -101,8 +105,11 @@ func TestGet(t *testing.T) {
} }
func TestUpdate(t *testing.T) { func TestUpdate(t *testing.T) {
storage, server, si := newStorage(t) storage, server, si, destroyFunc := newStorage(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
ctx := api.WithNamespace(api.NewContext(), "test") ctx := api.WithNamespace(api.NewContext(), "test")
key := etcdtest.AddPrefix("/controllers/test/foo") key := etcdtest.AddPrefix("/controllers/test/foo")

View File

@ -33,13 +33,14 @@ func StorageWithCacher(
resourcePrefix string, resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy, scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object, newListFunc func() runtime.Object,
triggerFunc storage.TriggerPublisherFunc) storage.Interface { triggerFunc storage.TriggerPublisherFunc) (storage.Interface, func()) {
s, d := generic.NewRawStorage(storageConfig)
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside. // TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
// Currently it has two layers of same storage interface -- cacher and low level kv. // Currently it has two layers of same storage interface -- cacher and low level kv.
cacherConfig := storage.CacherConfig{ cacherConfig := storage.CacherConfig{
CacheCapacity: capacity, CacheCapacity: capacity,
Storage: generic.NewRawStorage(storageConfig), Storage: s,
Versioner: etcdstorage.APIObjectVersioner{}, Versioner: etcdstorage.APIObjectVersioner{},
Type: objectType, Type: objectType,
ResourcePrefix: resourcePrefix, ResourcePrefix: resourcePrefix,
@ -56,6 +57,11 @@ func StorageWithCacher(
return storage.NoNamespaceKeyFunc(resourcePrefix, obj) return storage.NoNamespaceKeyFunc(resourcePrefix, obj)
} }
} }
cacher := storage.NewCacherFromConfig(cacherConfig)
destroyFunc := func() {
cacher.Stop()
d()
}
return storage.NewCacherFromConfig(cacherConfig) return cacher, destroyFunc
} }

View File

@ -88,7 +88,7 @@ func (t *testRESTStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Obje
} }
func (t *testRESTStrategy) Canonicalize(obj runtime.Object) {} func (t *testRESTStrategy) Canonicalize(obj runtime.Object) {}
func NewTestGenericStoreRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Store) { func NewTestGenericStoreRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, func(), *Store) {
return newTestGenericStoreRegistry(t, false) return newTestGenericStoreRegistry(t, false)
} }
@ -173,7 +173,7 @@ func TestStoreList(t *testing.T) {
if item.context != nil { if item.context != nil {
ctx = item.context ctx = item.context
} }
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
if item.in != nil { if item.in != nil {
if err := storagetesting.CreateList("/pods", registry.Storage, item.in); err != nil { if err := storagetesting.CreateList("/pods", registry.Storage, item.in); err != nil {
@ -191,6 +191,7 @@ func TestStoreList(t *testing.T) {
if e, a := item.out, list; !api.Semantic.DeepDerivative(e, a) { if e, a := item.out, list; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%v: Expected %#v, got %#v", name, e, a) t.Errorf("%v: Expected %#v, got %#v", name, e, a)
} }
destroyFunc()
server.Terminate(t) server.Terminate(t)
} }
} }
@ -208,8 +209,11 @@ func TestStoreListResourceVersion(t *testing.T) {
} }
ctx := api.WithNamespace(api.NewContext(), "test") ctx := api.WithNamespace(api.NewContext(), "test")
server, registry := newTestGenericStoreRegistry(t, true) server, destroyFunc, registry := newTestGenericStoreRegistry(t, true)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
obj, err := registry.Create(ctx, fooPod) obj, err := registry.Create(ctx, fooPod)
if err != nil { if err != nil {
@ -270,8 +274,11 @@ func TestStoreCreate(t *testing.T) {
} }
testContext := api.WithNamespace(api.NewContext(), "test") testContext := api.WithNamespace(api.NewContext(), "test")
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
// create the object // create the object
objA, err := registry.Create(testContext, podA) objA, err := registry.Create(testContext, podA)
@ -330,8 +337,11 @@ func TestStoreUpdate(t *testing.T) {
} }
testContext := api.WithNamespace(api.NewContext(), "test") testContext := api.WithNamespace(api.NewContext(), "test")
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
// Test1 try to update a non-existing node // Test1 try to update a non-existing node
_, _, err := registry.Update(testContext, podA.Name, rest.DefaultUpdatedObjectInfo(podA, api.Scheme)) _, _, err := registry.Update(testContext, podA.Name, rest.DefaultUpdatedObjectInfo(podA, api.Scheme))
@ -368,8 +378,11 @@ func TestStoreUpdate(t *testing.T) {
} }
func TestNoOpUpdates(t *testing.T) { func TestNoOpUpdates(t *testing.T) {
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
newPod := func() *api.Pod { newPod := func() *api.Pod {
return &api.Pod{ return &api.Pod{
@ -450,8 +463,11 @@ func TestStoreCustomExport(t *testing.T) {
Spec: api.PodSpec{NodeName: "machine"}, Spec: api.PodSpec{NodeName: "machine"},
} }
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
registry.ExportStrategy = testPodExport{} registry.ExportStrategy = testPodExport{}
@ -496,8 +512,11 @@ func TestStoreBasicExport(t *testing.T) {
Status: api.PodStatus{HostIP: "1.2.3.4"}, Status: api.PodStatus{HostIP: "1.2.3.4"},
} }
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
testContext := api.WithNamespace(api.NewContext(), "test") testContext := api.WithNamespace(api.NewContext(), "test")
registry.UpdateStrategy.(*testRESTStrategy).allowCreateOnUpdate = true registry.UpdateStrategy.(*testRESTStrategy).allowCreateOnUpdate = true
@ -528,8 +547,11 @@ func TestStoreGet(t *testing.T) {
} }
testContext := api.WithNamespace(api.NewContext(), "test") testContext := api.WithNamespace(api.NewContext(), "test")
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
_, err := registry.Get(testContext, podA.Name) _, err := registry.Get(testContext, podA.Name)
if !errors.IsNotFound(err) { if !errors.IsNotFound(err) {
@ -549,8 +571,11 @@ func TestStoreDelete(t *testing.T) {
} }
testContext := api.WithNamespace(api.NewContext(), "test") testContext := api.WithNamespace(api.NewContext(), "test")
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
// test failure condition // test failure condition
_, err := registry.Delete(testContext, podA.Name, nil) _, err := registry.Delete(testContext, podA.Name, nil)
@ -587,9 +612,11 @@ func TestStoreHandleFinalizers(t *testing.T) {
} }
testContext := api.WithNamespace(api.NewContext(), "test") testContext := api.WithNamespace(api.NewContext(), "test")
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
// create pod // create pod
_, err := registry.Create(testContext, podWithFinalizer) _, err := registry.Create(testContext, podWithFinalizer)
if err != nil { if err != nil {
@ -884,8 +911,11 @@ func TestStoreDeleteWithOrphanDependents(t *testing.T) {
} }
testContext := api.WithNamespace(api.NewContext(), "test") testContext := api.WithNamespace(api.NewContext(), "test")
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
for _, tc := range testcases { for _, tc := range testcases {
registry.DeleteStrategy = tc.strategy registry.DeleteStrategy = tc.strategy
@ -931,8 +961,11 @@ func TestStoreDeleteCollection(t *testing.T) {
podB := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}} podB := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}
testContext := api.WithNamespace(api.NewContext(), "test") testContext := api.WithNamespace(api.NewContext(), "test")
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
if _, err := registry.Create(testContext, podA); err != nil { if _, err := registry.Create(testContext, podA); err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
@ -960,8 +993,11 @@ func TestStoreDeleteCollection(t *testing.T) {
} }
func TestStoreDeleteCollectionNotFound(t *testing.T) { func TestStoreDeleteCollectionNotFound(t *testing.T) {
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
testContext := api.WithNamespace(api.NewContext(), "test") testContext := api.WithNamespace(api.NewContext(), "test")
@ -1006,8 +1042,11 @@ func TestStoreDeleteCollectionWithWatch(t *testing.T) {
podA := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} podA := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
testContext := api.WithNamespace(api.NewContext(), "test") testContext := api.WithNamespace(api.NewContext(), "test")
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
objCreated, err := registry.Create(testContext, podA) objCreated, err := registry.Create(testContext, podA)
if err != nil { if err != nil {
@ -1073,7 +1112,7 @@ func TestStoreWatch(t *testing.T) {
Spec: api.PodSpec{NodeName: "machine"}, Spec: api.PodSpec{NodeName: "machine"},
} }
server, registry := NewTestGenericStoreRegistry(t) server, destroyFunc, registry := NewTestGenericStoreRegistry(t)
wi, err := registry.WatchPredicate(ctx, m.selectPred, "0") wi, err := registry.WatchPredicate(ctx, m.selectPred, "0")
if err != nil { if err != nil {
t.Errorf("%v: unexpected error: %v", name, err) t.Errorf("%v: unexpected error: %v", name, err)
@ -1091,17 +1130,19 @@ func TestStoreWatch(t *testing.T) {
} }
wi.Stop() wi.Stop()
} }
destroyFunc()
server.Terminate(t) server.Terminate(t)
} }
} }
func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (*etcdtesting.EtcdTestServer, *Store) { func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (*etcdtesting.EtcdTestServer, func(), *Store) {
podPrefix := "/pods" podPrefix := "/pods"
server := etcdtesting.NewEtcdTestClientServer(t) server := etcdtesting.NewEtcdTestClientServer(t)
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true} strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true}
codec := testapi.Default.StorageCodec() codec := testapi.Default.StorageCodec()
s := etcdstorage.NewEtcdStorage(server.Client, codec, etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize) s := etcdstorage.NewEtcdStorage(server.Client, codec, etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
destroyFunc := func() {}
if hasCacheEnabled { if hasCacheEnabled {
config := storage.CacherConfig{ config := storage.CacherConfig{
CacheCapacity: 10, CacheCapacity: 10,
@ -1113,10 +1154,12 @@ func newTestGenericStoreRegistry(t *testing.T, hasCacheEnabled bool) (*etcdtesti
NewListFunc: func() runtime.Object { return &api.PodList{} }, NewListFunc: func() runtime.Object { return &api.PodList{} },
Codec: codec, Codec: codec,
} }
s = storage.NewCacherFromConfig(config) cacher := storage.NewCacherFromConfig(config)
s = cacher
destroyFunc = cacher.Stop
} }
return server, &Store{ return server, destroyFunc, &Store{
NewFunc: func() runtime.Object { return &api.Pod{} }, NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} }, NewListFunc: func() runtime.Object { return &api.PodList{} },
QualifiedResource: api.Resource("pods"), QualifiedResource: api.Resource("pods"),

View File

@ -34,7 +34,7 @@ type StorageDecorator func(
resourcePrefix string, resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy, scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object, newListFunc func() runtime.Object,
trigger storage.TriggerPublisherFunc) storage.Interface trigger storage.TriggerPublisherFunc) (storage.Interface, func())
// Returns given 'storageInterface' without any decoration. // Returns given 'storageInterface' without any decoration.
func UndecoratedStorage( func UndecoratedStorage(
@ -44,17 +44,17 @@ func UndecoratedStorage(
resourcePrefix string, resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy, scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object, newListFunc func() runtime.Object,
trigger storage.TriggerPublisherFunc) storage.Interface { trigger storage.TriggerPublisherFunc) (storage.Interface, func()) {
return NewRawStorage(config) return NewRawStorage(config)
} }
// NewRawStorage creates the low level kv storage. This is a work-around for current // NewRawStorage creates the low level kv storage. This is a work-around for current
// two layer of same storage interface. // two layer of same storage interface.
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method. // TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
func NewRawStorage(config *storagebackend.Config) storage.Interface { func NewRawStorage(config *storagebackend.Config) (storage.Interface, func()) {
s, err := factory.Create(*config) s, d, err := factory.Create(*config)
if err != nil { if err != nil {
glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err) glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err)
} }
return s return s, d
} }

View File

@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &autoscaling.HorizontalPodAutoscalerList{} } newListFunc := func() runtime.Object { return &autoscaling.HorizontalPodAutoscalerList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.HorizontalPodAutoscalers), cachesize.GetWatchCacheSizeByResource(cachesize.HorizontalPodAutoscalers),
&autoscaling.HorizontalPodAutoscaler{}, &autoscaling.HorizontalPodAutoscaler{},

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &extensions.IngressList{} } newListFunc := func() runtime.Object { return &extensions.IngressList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Ingress), cachesize.GetWatchCacheSizeByResource(cachesize.Ingress),
&extensions.Ingress{}, &extensions.Ingress{},

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &batch.JobList{} } newListFunc := func() runtime.Object { return &batch.JobList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Jobs), cachesize.GetWatchCacheSizeByResource(cachesize.Jobs),
&batch.Job{}, &batch.Job{},

View File

@ -35,7 +35,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.LimitRangeList{} } newListFunc := func() runtime.Object { return &api.LimitRangeList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.LimitRanges), cachesize.GetWatchCacheSizeByResource(cachesize.LimitRanges),
&api.LimitRange{}, &api.LimitRange{},

View File

@ -53,7 +53,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *FinalizeREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.NamespaceList{} } newListFunc := func() runtime.Object { return &api.NamespaceList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Namespaces), cachesize.GetWatchCacheSizeByResource(cachesize.Namespaces),
&api.Namespace{}, &api.Namespace{},

View File

@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &extensionsapi.NetworkPolicyList{} } newListFunc := func() runtime.Object { return &extensionsapi.NetworkPolicyList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.NetworkPolicys), cachesize.GetWatchCacheSizeByResource(cachesize.NetworkPolicys),
&extensionsapi.NetworkPolicy{}, &extensionsapi.NetworkPolicy{},

View File

@ -69,7 +69,7 @@ func NewStorage(opts generic.RESTOptions, connection client.ConnectionInfoGetter
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.NodeList{} } newListFunc := func() runtime.Object { return &api.NodeList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Nodes), cachesize.GetWatchCacheSizeByResource(cachesize.Nodes),
&api.Node{}, &api.Node{},

View File

@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.PersistentVolumeList{} } newListFunc := func() runtime.Object { return &api.PersistentVolumeList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumes), cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumes),
&api.PersistentVolume{}, &api.PersistentVolume{},

View File

@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.PersistentVolumeClaimList{} } newListFunc := func() runtime.Object { return &api.PersistentVolumeClaimList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumeClaims), cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumeClaims),
&api.PersistentVolumeClaim{}, &api.PersistentVolumeClaim{},

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &appsapi.PetSetList{} } newListFunc := func() runtime.Object { return &appsapi.PetSetList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.PetSet), cachesize.GetWatchCacheSizeByResource(cachesize.PetSet),
&appsapi.PetSet{}, &appsapi.PetSet{},

View File

@ -63,7 +63,7 @@ func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTr
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.PodList{} } newListFunc := func() runtime.Object { return &api.PodList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Pods), cachesize.GetWatchCacheSizeByResource(cachesize.Pods),
&api.Pod{}, &api.Pod{},

View File

@ -28,8 +28,10 @@ import (
func TestPodLogValidates(t *testing.T) { func TestPodLogValidates(t *testing.T) {
etcdStorage, _ := registrytest.NewEtcdStorage(t, "") etcdStorage, _ := registrytest.NewEtcdStorage(t, "")
s, destroyFunc := generic.NewRawStorage(etcdStorage)
defer destroyFunc()
store := &registry.Store{ store := &registry.Store{
Storage: generic.NewRawStorage(etcdStorage), Storage: s,
} }
logRest := &LogREST{Store: store, KubeletConn: nil} logRest := &LogREST{Store: store, KubeletConn: nil}

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &policyapi.PodDisruptionBudgetList{} } newListFunc := func() runtime.Object { return &policyapi.PodDisruptionBudgetList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.PodDisruptionBudget), cachesize.GetWatchCacheSizeByResource(cachesize.PodDisruptionBudget),
&policyapi.PodDisruptionBudget{}, &policyapi.PodDisruptionBudget{},

View File

@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &extensions.PodSecurityPolicyList{} } newListFunc := func() runtime.Object { return &extensions.PodSecurityPolicyList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.PodSecurityPolicies), cachesize.GetWatchCacheSizeByResource(cachesize.PodSecurityPolicies),
&extensions.PodSecurityPolicy{}, &extensions.PodSecurityPolicy{},

View File

@ -35,7 +35,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.PodTemplateList{} } newListFunc := func() runtime.Object { return &api.PodTemplateList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.PodTemplates), cachesize.GetWatchCacheSizeByResource(cachesize.PodTemplates),
&api.PodTemplate{}, &api.PodTemplate{},

View File

@ -61,7 +61,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &extensions.ReplicaSetList{} } newListFunc := func() runtime.Object { return &extensions.ReplicaSetList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Replicasets), cachesize.GetWatchCacheSizeByResource(cachesize.Replicasets),
&extensions.ReplicaSet{}, &extensions.ReplicaSet{},

View File

@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.ResourceQuotaList{} } newListFunc := func() runtime.Object { return &api.ResourceQuotaList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.ResourceQuotas), cachesize.GetWatchCacheSizeByResource(cachesize.ResourceQuotas),
&api.ResourceQuota{}, &api.ResourceQuota{},

View File

@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &rbac.RoleList{} } newListFunc := func() runtime.Object { return &rbac.RoleList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Roles), cachesize.GetWatchCacheSizeByResource(cachesize.Roles),
&rbac.Role{}, &rbac.Role{},

View File

@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &rbac.RoleBindingList{} } newListFunc := func() runtime.Object { return &rbac.RoleBindingList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.RoleBindings), cachesize.GetWatchCacheSizeByResource(cachesize.RoleBindings),
&rbac.RoleBinding{}, &rbac.RoleBinding{},

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &batch.ScheduledJobList{} } newListFunc := func() runtime.Object { return &batch.ScheduledJobList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.ScheduledJobs), cachesize.GetWatchCacheSizeByResource(cachesize.ScheduledJobs),
&batch.ScheduledJob{}, &batch.ScheduledJob{},

View File

@ -35,7 +35,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.SecretList{} } newListFunc := func() runtime.Object { return &api.SecretList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Secrets), cachesize.GetWatchCacheSizeByResource(cachesize.Secrets),
&api.Secret{}, &api.Secret{},

View File

@ -61,7 +61,7 @@ var _ rangeallocation.RangeRegistry = &Etcd{}
// NewEtcd returns an allocator that is backed by Etcd and can manage // NewEtcd returns an allocator that is backed by Etcd and can manage
// persisting the snapshot state of allocation after each allocation is made. // persisting the snapshot state of allocation after each allocation is made.
func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource unversioned.GroupResource, config *storagebackend.Config) *Etcd { func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource unversioned.GroupResource, config *storagebackend.Config) *Etcd {
storage := generic.NewRawStorage(config) storage, _ := generic.NewRawStorage(config)
return &Etcd{ return &Etcd{
alloc: alloc, alloc: alloc,
storage: storage, storage: storage,

View File

@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.ServiceList{} } newListFunc := func() runtime.Object { return &api.ServiceList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Services), cachesize.GetWatchCacheSizeByResource(cachesize.Services),
&api.Service{}, &api.Service{},

View File

@ -34,7 +34,7 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
func newStorage(t *testing.T) (*etcdtesting.EtcdTestServer, ipallocator.Interface, allocator.Interface, storage.Interface) { func newStorage(t *testing.T) (*etcdtesting.EtcdTestServer, ipallocator.Interface, allocator.Interface, storage.Interface, func()) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "") etcdStorage, server := registrytest.NewEtcdStorage(t, "")
_, cidr, err := net.ParseCIDR("192.168.1.0/24") _, cidr, err := net.ParseCIDR("192.168.1.0/24")
if err != nil { if err != nil {
@ -48,8 +48,8 @@ func newStorage(t *testing.T) (*etcdtesting.EtcdTestServer, ipallocator.Interfac
etcd := allocatoretcd.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), etcdStorage) etcd := allocatoretcd.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), etcdStorage)
return etcd return etcd
}) })
s, destroyFunc := generic.NewRawStorage(etcdStorage)
return server, storage, backing, generic.NewRawStorage(etcdStorage) return server, storage, backing, s, destroyFunc
} }
func validNewRangeAllocation() *api.RangeAllocation { func validNewRangeAllocation() *api.RangeAllocation {
@ -65,24 +65,33 @@ func key() string {
} }
func TestEmpty(t *testing.T) { func TestEmpty(t *testing.T) {
server, storage, _, _ := newStorage(t) server, storage, _, _, destroyFunc := newStorage(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
if err := storage.Allocate(net.ParseIP("192.168.1.2")); !strings.Contains(err.Error(), "cannot allocate resources of type serviceipallocations at this time") { if err := storage.Allocate(net.ParseIP("192.168.1.2")); !strings.Contains(err.Error(), "cannot allocate resources of type serviceipallocations at this time") {
t.Fatal(err) t.Fatal(err)
} }
} }
func TestErrors(t *testing.T) { func TestErrors(t *testing.T) {
server, storage, _, _ := newStorage(t) server, storage, _, _, destroyFunc := newStorage(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
if err := storage.Allocate(net.ParseIP("192.168.0.0")); err != ipallocator.ErrNotInRange { if err := storage.Allocate(net.ParseIP("192.168.0.0")); err != ipallocator.ErrNotInRange {
t.Fatal(err) t.Fatal(err)
} }
} }
func TestStore(t *testing.T) { func TestStore(t *testing.T) {
server, storage, backing, si := newStorage(t) server, storage, backing, si, destroyFunc := newStorage(t)
defer server.Terminate(t) defer func() {
destroyFunc()
server.Terminate(t)
}()
if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }

View File

@ -35,7 +35,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &api.ServiceAccountList{} } newListFunc := func() runtime.Object { return &api.ServiceAccountList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.ServiceAccounts), cachesize.GetWatchCacheSizeByResource(cachesize.ServiceAccounts),
&api.ServiceAccount{}, &api.ServiceAccount{},

View File

@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
newListFunc := func() runtime.Object { return &extensions.StorageClassList{} } newListFunc := func() runtime.Object { return &extensions.StorageClassList{} }
storageInterface := opts.Decorator( storageInterface, _ := opts.Decorator(
opts.StorageConfig, opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.StorageClasses), cachesize.GetWatchCacheSizeByResource(cachesize.StorageClasses),
&extensions.StorageClass{}, &extensions.StorageClass{},

View File

@ -35,7 +35,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix prefix := "/" + opts.ResourcePrefix
// We explicitly do NOT do any decoration here yet. // We explicitly do NOT do any decoration here yet.
storageInterface := generic.NewRawStorage(opts.StorageConfig) storageInterface, _ := generic.NewRawStorage(opts.StorageConfig)
store := &registry.Store{ store := &registry.Store{
NewFunc: func() runtime.Object { return &extensions.ThirdPartyResource{} }, NewFunc: func() runtime.Object { return &extensions.ThirdPartyResource{} },

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions, group, kind string) *REST {
prefix := "/ThirdPartyResourceData/" + group + "/" + strings.ToLower(kind) + "s" prefix := "/ThirdPartyResourceData/" + group + "/" + strings.ToLower(kind) + "s"
// We explicitly do NOT do any decoration here yet. // We explicitly do NOT do any decoration here yet.
storageInterface := generic.NewRawStorage(opts.StorageConfig) storageInterface, _ := generic.NewRawStorage(opts.StorageConfig)
store := &registry.Store{ store := &registry.Store{
NewFunc: func() runtime.Object { return &extensions.ThirdPartyResourceData{} }, NewFunc: func() runtime.Object { return &extensions.ThirdPartyResourceData{} },

View File

@ -30,16 +30,17 @@ import (
utilnet "k8s.io/kubernetes/pkg/util/net" utilnet "k8s.io/kubernetes/pkg/util/net"
) )
func newETCD2Storage(c storagebackend.Config) (storage.Interface, error) { func newETCD2Storage(c storagebackend.Config) (storage.Interface, func(), error) {
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile) tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
client, err := newETCD2Client(tr, c.ServerList) client, err := newETCD2Client(tr, c.ServerList)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
return etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize), nil s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize)
return s, tr.CloseIdleConnections, nil
} }
func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) { func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) {

View File

@ -26,7 +26,7 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
func newETCD3Storage(c storagebackend.Config) (storage.Interface, error) { func newETCD3Storage(c storagebackend.Config) (storage.Interface, func(), error) {
tlsInfo := transport.TLSInfo{ tlsInfo := transport.TLSInfo{
CertFile: c.CertFile, CertFile: c.CertFile,
KeyFile: c.KeyFile, KeyFile: c.KeyFile,
@ -34,7 +34,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, error) {
} }
tlsConfig, err := tlsInfo.ClientConfig() tlsConfig, err := tlsInfo.ClientConfig()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
cfg := clientv3.Config{ cfg := clientv3.Config{
@ -43,8 +43,13 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, error) {
} }
client, err := clientv3.New(cfg) client, err := clientv3.New(cfg)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
etcd3.StartCompactor(context.Background(), client) ctx, cancel := context.WithCancel(context.Background())
return etcd3.New(client, c.Codec, c.Prefix), nil etcd3.StartCompactor(ctx, client)
destroyFunc := func() {
cancel()
client.Close()
}
return etcd3.New(client, c.Codec, c.Prefix), destroyFunc, nil
} }

View File

@ -24,7 +24,7 @@ import (
) )
// Create creates a storage backend based on given config. // Create creates a storage backend based on given config.
func Create(c storagebackend.Config) (storage.Interface, error) { func Create(c storagebackend.Config) (storage.Interface, func(), error) {
switch c.Type { switch c.Type {
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2: case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2:
return newETCD2Storage(c) return newETCD2Storage(c)
@ -35,6 +35,6 @@ func Create(c storagebackend.Config) (storage.Interface, error) {
// - Support non-quorum read. // - Support non-quorum read.
return newETCD3Storage(c) return newETCD3Storage(c)
default: default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type) return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
} }
} }

View File

@ -56,7 +56,8 @@ func TestTLSConnection(t *testing.T) {
CAFile: caFile, CAFile: caFile,
Codec: testapi.Default.Codec(), Codec: testapi.Default.Codec(),
} }
storage, err := newETCD3Storage(cfg) storage, destroyFunc, err := newETCD3Storage(cfg)
defer destroyFunc()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }