Merge pull request #30251 from hongchaodeng/r2

Automatic merge from submit-queue

Move new etcd storage (low level storage) into cacher

In an effort for #29888, we are pushing forward this:

What?
- It changes creating etcd storage.Interface impl into creating config
- In creating cacher storage (StorageWithCacher), it passes config created above and new etcd storage inside.

Why?
- We want to expose the information of (etcd) kv client to cacher. Cacher storage uses this information to talk to remote storage.
pull/6/head
Kubernetes Submit Queue 2016-08-13 10:09:49 -07:00 committed by GitHub
commit e39d7f71e6
88 changed files with 188 additions and 173 deletions

View File

@ -186,11 +186,11 @@ func Run(s *options.APIServer) error {
if s.ServiceAccountLookup {
// If we need to look up service accounts and tokens,
// go directly to etcd to avoid recursive auth insanity
storage, err := storageFactory.New(api.Resource("serviceaccounts"))
storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts"))
if err != nil {
glog.Fatalf("Unable to get serviceaccounts storage: %v", err)
}
serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storage, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
}
authenticator, err := authenticator.New(authenticator.AuthenticatorConfig{
@ -227,11 +227,11 @@ func Run(s *options.APIServer) error {
if modeEnabled(apiserver.ModeRBAC) {
mustGetRESTOptions := func(resource string) generic.RESTOptions {
s, err := storageFactory.New(rbac.Resource(resource))
config, err := storageFactory.NewConfig(rbac.Resource(resource))
if err != nil {
glog.Fatalf("Unable to get %s storage: %v", resource, err)
}
return generic.RESTOptions{Storage: s, Decorator: generic.UndecoratedStorage, ResourcePrefix: storageFactory.ResourcePrefix(rbac.Resource(resource))}
return generic.RESTOptions{StorageConfig: config, Decorator: generic.UndecoratedStorage, ResourcePrefix: storageFactory.ResourcePrefix(rbac.Resource(resource))}
}
// For initial bootstrapping go directly to etcd to avoid privillege escalation check.

View File

@ -78,13 +78,13 @@ func Run(serverOptions *genericoptions.ServerRunOptions) error {
return fmt.Errorf("%v", err)
}
storageFactory := newStorageFactory()
storage, err := storageFactory.New(unversioned.GroupResource{Group: groupName, Resource: "testtype"})
storageConfig, err := storageFactory.NewConfig(unversioned.GroupResource{Group: groupName, Resource: "testtype"})
if err != nil {
return fmt.Errorf("Unable to get storage: %v", err)
return fmt.Errorf("Unable to get storage config: %v", err)
}
restStorageMap := map[string]rest.Storage{
"testtypes": testgroupetcd.NewREST(storage, s.StorageDecorator()),
"testtypes": testgroupetcd.NewREST(storageConfig, s.StorageDecorator()),
}
apiGroupInfo := genericapiserver.APIGroupInfo{
GroupMeta: *groupMeta,

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
type REST struct {
@ -32,13 +33,13 @@ type REST struct {
}
// NewREST returns a RESTStorage object that will work with testtype.
func NewREST(s storage.Interface, storageDecorator generic.StorageDecorator) *REST {
func NewREST(config *storagebackend.Config, storageDecorator generic.StorageDecorator) *REST {
prefix := "/testtype"
newListFunc := func() runtime.Object { return &testgroup.TestTypeList{} }
// Usually you should reuse your RESTCreateStrategy.
strategy := &NotNamespaceScoped{}
storageInterface := storageDecorator(
s, 100, &testgroup.TestType{}, prefix, strategy, newListFunc, storage.NoTriggerPublisher)
config, 100, &testgroup.TestType{}, prefix, strategy, newListFunc, storage.NoTriggerPublisher)
store := &registry.Store{
NewFunc: func() runtime.Object { return &testgroup.TestType{} },
// NewListFunc returns an object capable of storing results of an etcd list.

View File

@ -162,12 +162,12 @@ func Run(s *options.ServerRunOptions) error {
}
func createRESTOptionsOrDie(s *options.ServerRunOptions, g *genericapiserver.GenericAPIServer, f genericapiserver.StorageFactory, resource unversioned.GroupResource) generic.RESTOptions {
storage, err := f.New(resource)
config, err := f.NewConfig(resource)
if err != nil {
glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error())
glog.Fatalf("Unable to find storage config for %v, due to %v", resource, err.Error())
}
return generic.RESTOptions{
Storage: storage,
StorageConfig: config,
Decorator: g.StorageDecorator(),
DeleteCollectionWorkers: s.DeleteCollectionWorkers,
ResourcePrefix: f.ResourcePrefix(resource),

View File

@ -50,7 +50,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &federation.ClusterList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
100,
&federation.Cluster{},
prefix,

View File

@ -30,9 +30,9 @@ import (
)
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, federation.GroupName)
storageConfig, server := registrytest.NewEtcdStorage(t, federation.GroupName)
restOptions := generic.RESTOptions{
Storage: etcdStorage,
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: 1}
storage, _ := NewREST(restOptions)

View File

@ -25,7 +25,7 @@ import (
serviceaccountregistry "k8s.io/kubernetes/pkg/registry/serviceaccount"
serviceaccountetcd "k8s.io/kubernetes/pkg/registry/serviceaccount/etcd"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
// clientGetter implements ServiceAccountTokenGetter using a clientset.Interface
@ -69,9 +69,9 @@ func (r *registryGetter) GetSecret(namespace, name string) (*api.Secret, error)
// NewGetterFromStorageInterface returns a ServiceAccountTokenGetter that
// uses the specified storage to retrieve service accounts and secrets.
func NewGetterFromStorageInterface(s storage.Interface, saPrefix, secretPrefix string) serviceaccount.ServiceAccountTokenGetter {
func NewGetterFromStorageInterface(config *storagebackend.Config, saPrefix, secretPrefix string) serviceaccount.ServiceAccountTokenGetter {
return NewGetterFromRegistries(
serviceaccountregistry.NewRegistry(serviceaccountetcd.NewREST(generic.RESTOptions{Storage: s, Decorator: generic.UndecoratedStorage, ResourcePrefix: saPrefix})),
secret.NewRegistry(secretetcd.NewREST(generic.RESTOptions{Storage: s, Decorator: generic.UndecoratedStorage, ResourcePrefix: secretPrefix})),
serviceaccountregistry.NewRegistry(serviceaccountetcd.NewREST(generic.RESTOptions{StorageConfig: config, Decorator: generic.UndecoratedStorage, ResourcePrefix: saPrefix})),
secret.NewRegistry(secretetcd.NewREST(generic.RESTOptions{StorageConfig: config, Decorator: generic.UndecoratedStorage, ResourcePrefix: secretPrefix})),
)
}

View File

@ -25,9 +25,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/recognizer"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
storagebackendfactory "k8s.io/kubernetes/pkg/storage/storagebackend/factory"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/golang/glog"
@ -37,7 +35,7 @@ import (
type StorageFactory interface {
// New finds the storage destination for the given group and resource. It will
// return an error if the group has no storage destination configured.
New(groupResource unversioned.GroupResource) (storage.Interface, error)
NewConfig(groupResource unversioned.GroupResource) (*storagebackend.Config, error)
// ResourcePrefix returns the overridden resource prefix for the GroupResource
// This allows for cohabitation of resources with different native types and provides
@ -76,9 +74,6 @@ type DefaultStorageFactory struct {
// newStorageCodecFn exists to be overwritten for unit testing.
newStorageCodecFn func(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (codec runtime.Codec, err error)
// newStorageFn exists to be overwritten for unit testing.
newStorageFn func(config storagebackend.Config, codec runtime.Codec) (etcdStorage storage.Interface, err error)
}
type groupResourceOverrides struct {
@ -118,7 +113,6 @@ func NewDefaultStorageFactory(config storagebackend.Config, defaultMediaType str
APIResourceConfigSource: resourceConfig,
newStorageCodecFn: NewStorageCodec,
newStorageFn: newStorage,
}
}
@ -173,7 +167,7 @@ func (s *DefaultStorageFactory) getStorageGroupResource(groupResource unversione
// New finds the storage destination for the given group and resource. It will
// return an error if the group has no storage destination configured.
func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (storage.Interface, error) {
func (s *DefaultStorageFactory) NewConfig(groupResource unversioned.GroupResource) (*storagebackend.Config, error) {
chosenStorageResource := s.getStorageGroupResource(groupResource)
groupOverride := s.Overrides[getAllResourcesAlias(chosenStorageResource)]
@ -232,12 +226,8 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
}
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config)
return s.newStorageFn(config, codec)
}
// newStorage is the default implementation for creating a storage backend.
func newStorage(config storagebackend.Config, codec runtime.Codec) (etcdStorage storage.Interface, err error) {
return storagebackendfactory.Create(config, codec)
config.Codec = codec
return &config, nil
}
// Get all backends for all registered storage destinations.

View File

@ -24,8 +24,6 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/genericapiserver/options"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
@ -50,38 +48,31 @@ func TestUpdateEtcdOverrides(t *testing.T) {
defaultEtcdLocation := []string{"http://127.0.0.1"}
for i, test := range testCases {
actualConfig := storagebackend.Config{}
newStorageFn := func(config storagebackend.Config, codec runtime.Codec) (_ storage.Interface, err error) {
actualConfig = config
return nil, nil
}
defaultConfig := storagebackend.Config{
Prefix: options.DefaultEtcdPathPrefix,
ServerList: defaultEtcdLocation,
}
storageFactory := NewDefaultStorageFactory(defaultConfig, "", api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
storageFactory.newStorageFn = newStorageFn
storageFactory.SetEtcdLocation(test.resource, test.servers)
var err error
_, err = storageFactory.New(test.resource)
config, err := storageFactory.NewConfig(test.resource)
if err != nil {
t.Errorf("%d: unexpected error %v", i, err)
continue
}
if !reflect.DeepEqual(actualConfig.ServerList, test.servers) {
t.Errorf("%d: expected %v, got %v", i, test.servers, actualConfig.ServerList)
if !reflect.DeepEqual(config.ServerList, test.servers) {
t.Errorf("%d: expected %v, got %v", i, test.servers, config.ServerList)
continue
}
_, err = storageFactory.New(unversioned.GroupResource{Group: api.GroupName, Resource: "unlikely"})
config, err = storageFactory.NewConfig(unversioned.GroupResource{Group: api.GroupName, Resource: "unlikely"})
if err != nil {
t.Errorf("%d: unexpected error %v", i, err)
continue
}
if !reflect.DeepEqual(actualConfig.ServerList, defaultEtcdLocation) {
t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, actualConfig.ServerList)
if !reflect.DeepEqual(config.ServerList, defaultEtcdLocation) {
t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, config.ServerList)
continue
}

View File

@ -82,9 +82,9 @@ import (
"k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata"
thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata/etcd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdmetrics "k8s.io/kubernetes/pkg/storage/etcd/metrics"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/golang/glog"
@ -142,7 +142,7 @@ type Master struct {
serviceNodePortAllocator rangeallocation.RangeRegistry
// storage for third party objects
thirdPartyStorage storage.Interface
thirdPartyStorageConfig *storagebackend.Config
// map from api path to a tuple of (storage for the objects, APIGroup)
thirdPartyResources map[string]thirdPartyEntry
// protects the map
@ -276,7 +276,7 @@ func (m *Master) InstallAPIs(c *Config) {
// TODO seems like this bit ought to be unconditional and the REST API is controlled by the config
if c.APIResourceConfigSource.ResourceEnabled(extensionsapiv1beta1.SchemeGroupVersion.WithResource("thirdpartyresources")) {
var err error
m.thirdPartyStorage, err = c.StorageFactory.New(extensions.Resource("thirdpartyresources"))
m.thirdPartyStorageConfig, err = c.StorageFactory.NewConfig(extensions.Resource("thirdpartyresources"))
if err != nil {
glog.Fatalf("Error getting third party storage: %v", err)
}
@ -349,7 +349,7 @@ func (m *Master) initV1ResourcesStorage(c *Config) {
return
}
serviceStorage, err := c.StorageFactory.New(api.Resource("services"))
serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
if err != nil {
glog.Fatal(err.Error())
}
@ -357,7 +357,7 @@ func (m *Master) initV1ResourcesStorage(c *Config) {
serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorage)
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
serviceClusterIPRegistry = etcd
return etcd
})
@ -367,7 +367,7 @@ func (m *Master) initV1ResourcesStorage(c *Config) {
serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorage)
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
serviceNodePortRegistry = etcd
return etcd
})
@ -648,7 +648,7 @@ func (m *Master) InstallThirdPartyResource(rsrc *extensions.ThirdPartyResource)
func (m *Master) thirdpartyapi(group, kind, version, pluralResource string) *apiserver.APIGroupVersion {
resourceStorage := thirdpartyresourcedataetcd.NewREST(
generic.RESTOptions{
Storage: m.thirdPartyStorage,
StorageConfig: m.thirdPartyStorageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: m.deleteCollectionWorkers,
},
@ -691,13 +691,13 @@ func (m *Master) thirdpartyapi(group, kind, version, pluralResource string) *api
}
func (m *Master) GetRESTOptionsOrDie(c *Config, resource unversioned.GroupResource) generic.RESTOptions {
storage, err := c.StorageFactory.New(resource)
storageConfig, err := c.StorageFactory.NewConfig(resource)
if err != nil {
glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error())
}
return generic.RESTOptions{
Storage: storage,
StorageConfig: storageConfig,
Decorator: m.StorageDecorator(),
DeleteCollectionWorkers: m.deleteCollectionWorkers,
ResourcePrefix: c.StorageFactory.ResourcePrefix(resource),

View File

@ -51,6 +51,7 @@ import (
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/registry/endpoint"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/namespace"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata"
@ -700,7 +701,8 @@ func testInstallThirdPartyAPIListVersion(t *testing.T, version string) {
})
if test.items != nil {
err := createThirdPartyList(master.thirdPartyStorage,
err := createThirdPartyList(
generic.NewRawStorage(master.thirdPartyStorageConfig),
fmt.Sprintf("/ThirdPartyResourceData/%s/%s/default", group, plural.Resource),
test.items)
if !assert.NoError(err, test.test) {
@ -823,7 +825,8 @@ func testInstallThirdPartyAPIGetVersion(t *testing.T, version string) {
SomeField: "test field",
OtherField: 10,
}
if !assert.NoError(createThirdPartyObject(master.thirdPartyStorage, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) {
s := generic.NewRawStorage(master.thirdPartyStorageConfig)
if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) {
t.FailNow()
return
}
@ -899,9 +902,8 @@ func testInstallThirdPartyAPIPostForVersion(t *testing.T, version string) {
}
thirdPartyObj := extensions.ThirdPartyResourceData{}
err = master.thirdPartyStorage.Get(
context.TODO(), etcdtest.AddPrefix("/ThirdPartyResourceData/company.com/foos/default/test"),
&thirdPartyObj, false)
s := generic.NewRawStorage(master.thirdPartyStorageConfig)
err = s.Get(context.TODO(), etcdtest.AddPrefix("/ThirdPartyResourceData/company.com/foos/default/test"), &thirdPartyObj, false)
if !assert.NoError(err) {
t.FailNow()
}
@ -936,7 +938,8 @@ func testInstallThirdPartyAPIDeleteVersion(t *testing.T, version string) {
SomeField: "test field",
OtherField: 10,
}
if !assert.NoError(createThirdPartyObject(master.thirdPartyStorage, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) {
s := generic.NewRawStorage(master.thirdPartyStorageConfig)
if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) {
t.FailNow()
return
}
@ -975,8 +978,7 @@ func testInstallThirdPartyAPIDeleteVersion(t *testing.T, version string) {
expectedDeletedKey := etcdtest.AddPrefix("ThirdPartyResourceData/company.com/foos/default/test")
thirdPartyObj := extensions.ThirdPartyResourceData{}
err = master.thirdPartyStorage.Get(
context.TODO(), expectedDeletedKey, &thirdPartyObj, false)
err = s.Get(context.TODO(), expectedDeletedKey, &thirdPartyObj, false)
if !storage.IsNotFound(err) {
t.Errorf("expected deletion didn't happen: %v", err)
}
@ -1044,13 +1046,14 @@ func testInstallThirdPartyResourceRemove(t *testing.T, version string) {
SomeField: "test field",
OtherField: 10,
}
if !assert.NoError(createThirdPartyObject(master.thirdPartyStorage, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) {
s := generic.NewRawStorage(master.thirdPartyStorageConfig)
if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) {
t.FailNow()
return
}
secondObj := expectedObj
secondObj.Name = "bar"
if !assert.NoError(createThirdPartyObject(master.thirdPartyStorage, "/ThirdPartyResourceData/company.com/foos/default/bar", "bar", secondObj)) {
if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/bar", "bar", secondObj)) {
t.FailNow()
return
}
@ -1095,7 +1098,8 @@ func testInstallThirdPartyResourceRemove(t *testing.T, version string) {
}
for _, key := range expectedDeletedKeys {
thirdPartyObj := extensions.ThirdPartyResourceData{}
err := master.thirdPartyStorage.Get(context.TODO(), key, &thirdPartyObj, false)
s := generic.NewRawStorage(master.thirdPartyStorageConfig)
err := s.Get(context.TODO(), key, &thirdPartyObj, false)
if !storage.IsNotFound(err) {
t.Errorf("expected deletion didn't happen: %v", err)
}

View File

@ -41,7 +41,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *ApprovalREST) {
newListFunc := func() runtime.Object { return &certificates.CertificateSigningRequestList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.CertificateSigningRequests),
&certificates.CertificateSigningRequest{},
prefix,

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &rbac.ClusterRoleList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.ClusterRoles),
&rbac.ClusterRole{},
prefix,

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &rbac.ClusterRoleBindingList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.ClusterRoleBindings),
&rbac.ClusterRoleBinding{},
prefix,

View File

@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &api.ConfigMapList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.ConfigMaps),
&api.ConfigMap{},
prefix,

View File

@ -30,7 +30,7 @@ import (
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
return NewREST(restOptions), server
}

View File

@ -63,7 +63,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &api.ReplicationControllerList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Controllers),
&api.ReplicationController{},
prefix,

View File

@ -39,7 +39,7 @@ const (
func newStorage(t *testing.T) (ControllerStorage, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
storage := NewStorage(restOptions)
return storage, server
}

View File

@ -39,7 +39,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &extensions.DaemonSetList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Daemonsets),
&extensions.DaemonSet{},
prefix,

View File

@ -32,7 +32,7 @@ import (
func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName)
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
daemonSetStorage, statusStorage := NewREST(restOptions)
return daemonSetStorage, statusStorage, server
}

View File

@ -63,7 +63,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *RollbackREST) {
newListFunc := func() runtime.Object { return &extensions.DeploymentList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Deployments),
&extensions.Deployment{},
prefix,

View File

@ -41,7 +41,7 @@ const defaultReplicas = 100
func newStorage(t *testing.T) (*DeploymentStorage, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName)
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "deployments"}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "deployments"}
deploymentStorage := NewStorage(restOptions)
return &deploymentStorage, server
}

View File

@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &api.EndpointsList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Endpoints),
&api.Endpoints{},
prefix,

View File

@ -30,7 +30,7 @@ import (
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
return NewREST(restOptions), server
}

View File

@ -34,7 +34,7 @@ func NewREST(opts generic.RESTOptions, ttl uint64) *REST {
// We explicitly do NOT do any decoration here - switching on Cacher
// for events will lead to too high memory consumption.
storageInterface := opts.Storage
storageInterface := generic.NewRawStorage(opts.StorageConfig)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.Event{} },

View File

@ -30,7 +30,7 @@ var testTTL uint64 = 60
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
return NewREST(restOptions, testTTL), server
}

View File

@ -32,8 +32,8 @@ import (
func newStorage(t *testing.T) (*ScaleREST, *etcdtesting.EtcdTestServer, storage.Interface) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "controllers"}
return NewStorage(restOptions).Scale, server, etcdStorage
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "controllers"}
return NewStorage(restOptions).Scale, server, generic.NewRawStorage(etcdStorage)
}
var validPodTemplate = api.PodTemplate{

View File

@ -16,13 +16,11 @@ limitations under the License.
package generic
import (
pkgstorage "k8s.io/kubernetes/pkg/storage"
)
import "k8s.io/kubernetes/pkg/storage/storagebackend"
// RESTOptions is set of configuration options to generic registries.
type RESTOptions struct {
Storage pkgstorage.Interface
StorageConfig *storagebackend.Config
Decorator StorageDecorator
DeleteCollectionWorkers int

View File

@ -18,14 +18,16 @@ package registry
import (
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
// Creates a cacher on top of the given 'storageInterface'.
// Creates a cacher based given storageConfig.
func StorageWithCacher(
storageInterface storage.Interface,
storageConfig *storagebackend.Config,
capacity int,
objectType runtime.Object,
resourcePrefix string,
@ -33,9 +35,11 @@ func StorageWithCacher(
newListFunc func() runtime.Object,
triggerFunc storage.TriggerPublisherFunc) storage.Interface {
config := storage.CacherConfig{
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
// Currently it has two layers of same storage interface -- cacher and low level kv.
cacherConfig := storage.CacherConfig{
CacheCapacity: capacity,
Storage: storageInterface,
Storage: generic.NewRawStorage(storageConfig),
Versioner: etcdstorage.APIObjectVersioner{},
Type: objectType,
ResourcePrefix: resourcePrefix,
@ -43,14 +47,14 @@ func StorageWithCacher(
TriggerPublisherFunc: triggerFunc,
}
if scopeStrategy.NamespaceScoped() {
config.KeyFunc = func(obj runtime.Object) (string, error) {
cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) {
return storage.NamespaceKeyFunc(resourcePrefix, obj)
}
} else {
config.KeyFunc = func(obj runtime.Object) (string, error) {
cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) {
return storage.NoNamespaceKeyFunc(resourcePrefix, obj)
}
}
return storage.NewCacherFromConfig(config)
return storage.NewCacherFromConfig(cacherConfig)
}

View File

@ -17,15 +17,18 @@ limitations under the License.
package generic
import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/storage/storagebackend/factory"
)
// StorageDecorator is a function signature for producing
// a storage.Interface from given parameters.
type StorageDecorator func(
storageInterface storage.Interface,
config *storagebackend.Config,
capacity int,
objectType runtime.Object,
resourcePrefix string,
@ -35,12 +38,23 @@ type StorageDecorator func(
// Returns given 'storageInterface' without any decoration.
func UndecoratedStorage(
storageInterface storage.Interface,
config *storagebackend.Config,
capacity int,
objectType runtime.Object,
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object,
trigger storage.TriggerPublisherFunc) storage.Interface {
return storageInterface
return NewRawStorage(config)
}
// NewRawStorage creates the low level kv storage. This is a work-around for current
// two layer of same storage interface.
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
func NewRawStorage(config *storagebackend.Config) storage.Interface {
s, err := factory.Create(*config)
if err != nil {
glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err)
}
return s
}

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &autoscaling.HorizontalPodAutoscalerList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.HorizontalPodAutoscalers),
&autoscaling.HorizontalPodAutoscaler{},
prefix,

View File

@ -33,7 +33,7 @@ import (
func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, autoscaling.GroupName)
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
horizontalPodAutoscalerStorage, statusStorage := NewREST(restOptions)
return horizontalPodAutoscalerStorage, statusStorage, server
}

View File

@ -39,7 +39,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &extensions.IngressList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Ingress),
&extensions.Ingress{},
prefix,

View File

@ -32,7 +32,7 @@ import (
func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName)
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
ingressStorage, statusStorage := NewREST(restOptions)
return ingressStorage, statusStorage, server
}

View File

@ -39,7 +39,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &batch.JobList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Jobs),
&batch.Job{},
prefix,

View File

@ -33,7 +33,7 @@ import (
func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName)
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
jobStorage, statusStorage := NewREST(restOptions)
return jobStorage, statusStorage, server
}

View File

@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &api.LimitRangeList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.LimitRanges),
&api.LimitRange{},
prefix,

View File

@ -31,7 +31,7 @@ import (
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
return NewREST(restOptions), server
}

View File

@ -54,7 +54,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *FinalizeREST) {
newListFunc := func() runtime.Object { return &api.NamespaceList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Namespaces),
&api.Namespace{},
prefix,

View File

@ -31,7 +31,7 @@ import (
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "namespaces"}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "namespaces"}
namespaceStorage, _, _ := NewREST(restOptions)
return namespaceStorage, server
}

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &extensionsapi.NetworkPolicyList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.NetworkPolicys),
&extensionsapi.NetworkPolicy{},
prefix,

View File

@ -33,7 +33,7 @@ import (
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "extensions")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
return NewREST(restOptions), server
}

View File

@ -70,7 +70,7 @@ func NewStorage(opts generic.RESTOptions, connection client.ConnectionInfoGetter
newListFunc := func() runtime.Object { return &api.NodeList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Nodes),
&api.Node{},
prefix,

View File

@ -39,7 +39,7 @@ func (fakeConnectionInfoGetter) GetConnectionInfo(ctx api.Context, nodeName stri
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
storage := NewStorage(restOptions, fakeConnectionInfoGetter{}, nil)
return storage.Node, server
}

View File

@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &api.PersistentVolumeList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumes),
&api.PersistentVolume{},
prefix,

View File

@ -34,7 +34,7 @@ import (
func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
persistentVolumeStorage, statusStorage := NewREST(restOptions)
return persistentVolumeStorage, statusStorage, server
}

View File

@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &api.PersistentVolumeClaimList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumeClaims),
&api.PersistentVolumeClaim{},
prefix,

View File

@ -34,7 +34,7 @@ import (
func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
persistentVolumeClaimStorage, statusStorage := NewREST(restOptions)
return persistentVolumeClaimStorage, statusStorage, server
}

View File

@ -39,7 +39,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &appsapi.PetSetList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.PetSet),
&appsapi.PetSet{},
prefix,

View File

@ -33,7 +33,7 @@ import (
func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, apps.GroupName)
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "petsets"}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "petsets"}
petSetStorage, statusStorage := NewREST(restOptions)
return petSetStorage, statusStorage, server
}

View File

@ -61,7 +61,7 @@ func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTr
newListFunc := func() runtime.Object { return &api.PodList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Pods),
&api.Pod{},
prefix,

View File

@ -39,7 +39,7 @@ import (
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 3}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 3}
storage := NewStorage(restOptions, nil, nil)
return storage.Pod, storage.Binding, storage.Status, server
}
@ -144,9 +144,9 @@ func (f FailDeletionStorage) Delete(ctx context.Context, key string, out runtime
func newFailDeleteStorage(t *testing.T, called *bool) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
failDeleteStorage := FailDeletionStorage{etcdStorage, called}
restOptions := generic.RESTOptions{Storage: failDeleteStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 3}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 3}
storage := NewStorage(restOptions, nil, nil)
storage.Pod.Store.Storage = FailDeletionStorage{storage.Pod.Store.Storage, called}
return storage.Pod, server
}

View File

@ -21,6 +21,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/registrytest"
)
@ -28,7 +29,7 @@ import (
func TestPodLogValidates(t *testing.T) {
etcdStorage, _ := registrytest.NewEtcdStorage(t, "")
store := &registry.Store{
Storage: etcdStorage,
Storage: generic.NewRawStorage(etcdStorage),
}
logRest := &LogREST{Store: store, KubeletConn: nil}

View File

@ -39,7 +39,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &policyapi.PodDisruptionBudgetList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.PodDisruptionBudget),
&policyapi.PodDisruptionBudget{},
prefix,

View File

@ -34,7 +34,7 @@ import (
func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, policy.GroupName)
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "poddisruptionbudgets"}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "poddisruptionbudgets"}
podDisruptionBudgetStorage, statusStorage := NewREST(restOptions)
return podDisruptionBudgetStorage, statusStorage, server
}

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &extensions.PodSecurityPolicyList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.PodSecurityPolicies),
&extensions.PodSecurityPolicy{},
prefix,

View File

@ -33,7 +33,7 @@ import (
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "extensions")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
return NewREST(restOptions), server
}

View File

@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &api.PodTemplateList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.PodTemplates),
&api.PodTemplate{},
prefix,

View File

@ -30,7 +30,7 @@ import (
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
return NewREST(restOptions), server
}

View File

@ -29,17 +29,23 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/storage/storagebackend"
storagetesting "k8s.io/kubernetes/pkg/storage/testing"
)
func NewEtcdStorage(t *testing.T, group string) (storage.Interface, *etcdtesting.EtcdTestServer) {
func NewEtcdStorage(t *testing.T, group string) (*storagebackend.Config, *etcdtesting.EtcdTestServer) {
server := etcdtesting.NewUnsecuredEtcdTestClientServer(t)
storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
return storage, server
config := &storagebackend.Config{
Type: "etcd2",
Prefix: etcdtest.PathPrefix(),
ServerList: server.Client.Endpoints(),
DeserializationCacheSize: etcdtest.DeserializationCacheSize,
Codec: testapi.Groups[group].StorageCodec(),
}
return config, server
}
type Tester struct {

View File

@ -62,7 +62,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &extensions.ReplicaSetList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Replicasets),
&extensions.ReplicaSet{},
prefix,

View File

@ -38,7 +38,7 @@ const defaultReplicas = 100
func newStorage(t *testing.T) (*ReplicaSetStorage, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "extensions")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "replicasets"}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "replicasets"}
replicaSetStorage := NewStorage(restOptions)
return &replicaSetStorage, server
}

View File

@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &api.ResourceQuotaList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.ResourceQuotas),
&api.ResourceQuota{},
prefix,

View File

@ -33,7 +33,7 @@ import (
func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
resourceQuotaStorage, statusStorage := NewREST(restOptions)
return resourceQuotaStorage, statusStorage, server
}

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &rbac.RoleList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Roles),
&rbac.Role{},
prefix,

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &rbac.RoleBindingList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.RoleBindings),
&rbac.RoleBinding{},
prefix,

View File

@ -39,7 +39,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &batch.ScheduledJobList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.ScheduledJobs),
&batch.ScheduledJob{},
prefix,

View File

@ -33,7 +33,7 @@ import (
func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, batch.GroupName)
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
storage, statusStorage := NewREST(restOptions)
return storage, statusStorage, server
}

View File

@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &api.SecretList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Secrets),
&api.Secret{},
prefix,

View File

@ -30,7 +30,7 @@ import (
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
return NewREST(restOptions), server
}

View File

@ -25,10 +25,12 @@ import (
k8serr "k8s.io/kubernetes/pkg/api/errors"
storeerr "k8s.io/kubernetes/pkg/api/errors/storage"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/rangeallocation"
"k8s.io/kubernetes/pkg/registry/service/allocator"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"golang.org/x/net/context"
)
@ -58,7 +60,8 @@ var _ rangeallocation.RangeRegistry = &Etcd{}
// NewEtcd returns an allocator that is backed by Etcd and can manage
// persisting the snapshot state of allocation after each allocation is made.
func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource unversioned.GroupResource, storage storage.Interface) *Etcd {
func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource unversioned.GroupResource, config *storagebackend.Config) *Etcd {
storage := generic.NewRawStorage(config)
return &Etcd{
alloc: alloc,
storage: storage,

View File

@ -25,15 +25,16 @@ import (
"k8s.io/kubernetes/pkg/registry/service/allocator"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"golang.org/x/net/context"
)
func newStorage(t *testing.T) (*Etcd, *etcdtesting.EtcdTestServer, allocator.Interface) {
func newStorage(t *testing.T) (*Etcd, *etcdtesting.EtcdTestServer, allocator.Interface, *storagebackend.Config) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
mem := allocator.NewAllocationMap(100, "rangeSpecValue")
etcd := NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), etcdStorage)
return etcd, server, mem
return etcd, server, mem, etcdStorage
}
func validNewRangeAllocation() *api.RangeAllocation {
@ -48,7 +49,7 @@ func key() string {
}
func TestEmpty(t *testing.T) {
storage, server, _ := newStorage(t)
storage, server, _, _ := newStorage(t)
defer server.Terminate(t)
if _, err := storage.Allocate(1); !strings.Contains(err.Error(), "cannot allocate resources of type serviceipallocations at this time") {
t.Fatal(err)
@ -56,7 +57,7 @@ func TestEmpty(t *testing.T) {
}
func TestStore(t *testing.T) {
storage, server, backing := newStorage(t)
storage, server, backing, config := newStorage(t)
defer server.Terminate(t)
if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
@ -93,7 +94,7 @@ func TestStore(t *testing.T) {
}
other = allocator.NewAllocationMap(100, "rangeSpecValue")
otherStorage := NewEtcd(other, "/ranges/serviceips", api.Resource("serviceipallocations"), storage.storage)
otherStorage := NewEtcd(other, "/ranges/serviceips", api.Resource("serviceipallocations"), config)
if ok, err := otherStorage.Allocate(2); ok || err != nil {
t.Fatal(err)
}

View File

@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &api.ServiceList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.Services),
&api.Service{},
prefix,

View File

@ -31,7 +31,7 @@ import (
func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
serviceStorage, statusStorage := NewREST(restOptions)
return serviceStorage, statusStorage, server
}

View File

@ -22,6 +22,7 @@ import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/registry/service/allocator"
allocatoretcd "k8s.io/kubernetes/pkg/registry/service/allocator/etcd"
@ -48,7 +49,7 @@ func newStorage(t *testing.T) (*etcdtesting.EtcdTestServer, ipallocator.Interfac
return etcd
})
return server, storage, backing, etcdStorage
return server, storage, backing, generic.NewRawStorage(etcdStorage)
}
func validNewRangeAllocation() *api.RangeAllocation {

View File

@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &api.ServiceAccountList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.ServiceAccounts),
&api.ServiceAccount{},
prefix,

View File

@ -30,7 +30,7 @@ import (
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
return NewREST(restOptions), server
}

View File

@ -37,7 +37,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &extensions.StorageClassList{} }
storageInterface := opts.Decorator(
opts.Storage,
opts.StorageConfig,
cachesize.GetWatchCacheSizeByResource(cachesize.StorageClasses),
&extensions.StorageClass{},
prefix,

View File

@ -31,7 +31,7 @@ import (
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName)
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
storageClassStorage := NewREST(restOptions)
return storageClassStorage, server
}

View File

@ -35,7 +35,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix := "/" + opts.ResourcePrefix
// We explicitly do NOT do any decoration here yet.
storageInterface := opts.Storage
storageInterface := generic.NewRawStorage(opts.StorageConfig)
store := &registry.Store{
NewFunc: func() runtime.Object { return &extensions.ThirdPartyResource{} },

View File

@ -34,7 +34,7 @@ import (
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName)
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
return NewREST(restOptions), server
}

View File

@ -38,7 +38,7 @@ func NewREST(opts generic.RESTOptions, group, kind string) *REST {
prefix := "/ThirdPartyResourceData/" + group + "/" + strings.ToLower(kind) + "s"
// We explicitly do NOT do any decoration here yet.
storageInterface := opts.Storage
storageInterface := generic.NewRawStorage(opts.StorageConfig)
store := &registry.Store{
NewFunc: func() runtime.Object { return &extensions.ThirdPartyResourceData{} },

View File

@ -33,7 +33,7 @@ import (
func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) {
etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName)
restOptions := generic.RESTOptions{Storage: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1}
return NewREST(restOptions, "foo", "bar"), server
}

View File

@ -16,6 +16,8 @@ limitations under the License.
package storagebackend
import "k8s.io/kubernetes/pkg/runtime"
const (
StorageTypeUnset = ""
StorageTypeETCD2 = "etcd2"
@ -40,4 +42,6 @@ type Config struct {
// Currently this is only supported in etcd2.
// We will drop the cache once using protobuf.
DeserializationCacheSize int
Codec runtime.Codec
}

View File

@ -24,14 +24,13 @@ import (
etcd2client "github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/transport"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/storagebackend"
utilnet "k8s.io/kubernetes/pkg/util/net"
)
func newETCD2Storage(c storagebackend.Config, codec runtime.Codec) (storage.Interface, error) {
func newETCD2Storage(c storagebackend.Config) (storage.Interface, error) {
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil {
return nil, err
@ -40,7 +39,7 @@ func newETCD2Storage(c storagebackend.Config, codec runtime.Codec) (storage.Inte
if err != nil {
return nil, err
}
return etcd.NewEtcdStorage(client, codec, c.Prefix, c.Quorum, c.DeserializationCacheSize), nil
return etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize), nil
}
func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) {

View File

@ -22,13 +22,12 @@ import (
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd3"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
func newETCD3Storage(c storagebackend.Config, codec runtime.Codec) (storage.Interface, error) {
func newETCD3Storage(c storagebackend.Config) (storage.Interface, error) {
endpoints := c.ServerList
for i, s := range endpoints {
endpoints[i] = strings.TrimLeft(s, "http://")
@ -41,5 +40,5 @@ func newETCD3Storage(c storagebackend.Config, codec runtime.Codec) (storage.Inte
return nil, err
}
etcd3.StartCompactor(context.Background(), client)
return etcd3.New(client, codec, c.Prefix), nil
return etcd3.New(client, c.Codec, c.Prefix), nil
}

View File

@ -19,22 +19,21 @@ package factory
import (
"fmt"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
// Create creates a storage backend based on given config.
func Create(c storagebackend.Config, codec runtime.Codec) (storage.Interface, error) {
func Create(c storagebackend.Config) (storage.Interface, error) {
switch c.Type {
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2:
return newETCD2Storage(c, codec)
return newETCD2Storage(c)
case storagebackend.StorageTypeETCD3:
// TODO: We have the following features to implement:
// - Support secure connection by using key, cert, and CA files.
// - Honor "https" scheme to support secure connection in gRPC.
// - Support non-quorum read.
return newETCD3Storage(c, codec)
return newETCD3Storage(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}

View File

@ -77,11 +77,11 @@ func clientForUser(user string) *http.Client {
func newRBACAuthorizer(t *testing.T, superUser string, config *master.Config) authorizer.Authorizer {
newRESTOptions := func(resource string) generic.RESTOptions {
storageInterface, err := config.StorageFactory.New(rbacapi.Resource(resource))
storageConfig, err := config.StorageFactory.NewConfig(rbacapi.Resource(resource))
if err != nil {
t.Fatalf("failed to get storage: %v", err)
}
return generic.RESTOptions{Storage: storageInterface, Decorator: generic.UndecoratedStorage, ResourcePrefix: resource}
return generic.RESTOptions{StorageConfig: storageConfig, Decorator: generic.UndecoratedStorage, ResourcePrefix: resource}
}
roleRegistry := role.NewRegistry(roleetcd.NewREST(newRESTOptions("roles")))