mirror of https://github.com/k3s-io/k3s
Refactor code for creating Cacher.
parent
fcbf1c1012
commit
8f385c563f
|
@ -535,8 +535,14 @@ func (m *Master) init(c *Config) {
|
|||
|
||||
healthzChecks := []healthz.HealthzChecker{}
|
||||
|
||||
var storageFactory storage.StorageFactory
|
||||
if c.EnableWatchCache {
|
||||
storageFactory = storage.NewCacher
|
||||
} else {
|
||||
storageFactory = storage.NoDecoration
|
||||
}
|
||||
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
|
||||
podStorage := podetcd.NewStorage(dbClient("pods"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
|
||||
podStorage := podetcd.NewStorage(dbClient("pods"), storageFactory, c.KubeletClient, m.proxyTransport)
|
||||
|
||||
podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"))
|
||||
|
||||
|
@ -552,10 +558,10 @@ func (m *Master) init(c *Config) {
|
|||
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces"))
|
||||
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)
|
||||
|
||||
endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), c.EnableWatchCache)
|
||||
endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), storageFactory)
|
||||
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)
|
||||
|
||||
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
|
||||
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), storageFactory, c.KubeletClient, m.proxyTransport)
|
||||
m.nodeRegistry = node.NewRegistry(nodeStorage)
|
||||
|
||||
serviceStorage := serviceetcd.NewREST(dbClient("services"))
|
||||
|
|
|
@ -32,27 +32,16 @@ type REST struct {
|
|||
}
|
||||
|
||||
// NewREST returns a RESTStorage object that will work against endpoints.
|
||||
func NewREST(s storage.Interface, useCacher bool) *REST {
|
||||
func NewREST(s storage.Interface, storageFactory storage.StorageFactory) *REST {
|
||||
prefix := "/services/endpoints"
|
||||
|
||||
storageInterface := s
|
||||
if useCacher {
|
||||
config := storage.CacherConfig{
|
||||
CacheCapacity: 1000,
|
||||
Storage: s,
|
||||
Type: &api.Endpoints{},
|
||||
ResourcePrefix: prefix,
|
||||
KeyFunc: func(obj runtime.Object) (string, error) {
|
||||
return storage.NamespaceKeyFunc(prefix, obj)
|
||||
},
|
||||
NewListFunc: func() runtime.Object { return &api.EndpointsList{} },
|
||||
}
|
||||
storageInterface = storage.NewCacher(config)
|
||||
}
|
||||
newListFunc := func() runtime.Object { return &api.EndpointsList{} }
|
||||
storageInterface := storageFactory(
|
||||
s, 1000, nil, &api.Endpoints{}, prefix, true, newListFunc)
|
||||
|
||||
store := &etcdgeneric.Etcd{
|
||||
NewFunc: func() runtime.Object { return &api.Endpoints{} },
|
||||
NewListFunc: func() runtime.Object { return &api.EndpointsList{} },
|
||||
NewListFunc: newListFunc,
|
||||
KeyRootFunc: func(ctx api.Context) string {
|
||||
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
|
||||
},
|
||||
|
|
|
@ -24,12 +24,13 @@ import (
|
|||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/registry/registrytest"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
)
|
||||
|
||||
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
|
||||
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
|
||||
return NewREST(etcdStorage, false), fakeClient
|
||||
return NewREST(etcdStorage, storage.NoDecoration), fakeClient
|
||||
}
|
||||
|
||||
func validNewEndpoints() *api.Endpoints {
|
||||
|
|
|
@ -50,27 +50,16 @@ func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object
|
|||
}
|
||||
|
||||
// NewREST returns a RESTStorage object that will work against nodes.
|
||||
func NewREST(s storage.Interface, useCacher bool, connection client.ConnectionInfoGetter, proxyTransport http.RoundTripper) (*REST, *StatusREST) {
|
||||
func NewREST(s storage.Interface, storageFactory storage.StorageFactory, connection client.ConnectionInfoGetter, proxyTransport http.RoundTripper) (*REST, *StatusREST) {
|
||||
prefix := "/minions"
|
||||
|
||||
storageInterface := s
|
||||
if useCacher {
|
||||
config := storage.CacherConfig{
|
||||
CacheCapacity: 1000,
|
||||
Storage: s,
|
||||
Type: &api.Node{},
|
||||
ResourcePrefix: prefix,
|
||||
KeyFunc: func(obj runtime.Object) (string, error) {
|
||||
return storage.NoNamespaceKeyFunc(prefix, obj)
|
||||
},
|
||||
NewListFunc: func() runtime.Object { return &api.NodeList{} },
|
||||
}
|
||||
storageInterface = storage.NewCacher(config)
|
||||
}
|
||||
newListFunc := func() runtime.Object { return &api.NodeList{} }
|
||||
storageInterface := storageFactory(
|
||||
s, 1000, nil, &api.Node{}, prefix, false, newListFunc)
|
||||
|
||||
store := &etcdgeneric.Etcd{
|
||||
NewFunc: func() runtime.Object { return &api.Node{} },
|
||||
NewListFunc: func() runtime.Object { return &api.NodeList{} },
|
||||
NewListFunc: newListFunc,
|
||||
KeyRootFunc: func(ctx api.Context) string {
|
||||
return prefix
|
||||
},
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/registry/registrytest"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
)
|
||||
|
||||
|
@ -38,7 +39,7 @@ func (fakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, ht
|
|||
|
||||
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
|
||||
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
|
||||
storage, _ := NewREST(etcdStorage, false, fakeConnectionInfoGetter{}, nil)
|
||||
storage, _ := NewREST(etcdStorage, storage.NoDecoration, fakeConnectionInfoGetter{}, nil)
|
||||
return storage, fakeClient
|
||||
}
|
||||
|
||||
|
|
|
@ -60,27 +60,16 @@ type REST struct {
|
|||
}
|
||||
|
||||
// NewStorage returns a RESTStorage object that will work against pods.
|
||||
func NewStorage(s storage.Interface, useCacher bool, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
|
||||
func NewStorage(s storage.Interface, storageFactory storage.StorageFactory, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
|
||||
prefix := "/pods"
|
||||
|
||||
storageInterface := s
|
||||
if useCacher {
|
||||
config := storage.CacherConfig{
|
||||
CacheCapacity: 1000,
|
||||
Storage: s,
|
||||
Type: &api.Pod{},
|
||||
ResourcePrefix: prefix,
|
||||
KeyFunc: func(obj runtime.Object) (string, error) {
|
||||
return storage.NamespaceKeyFunc(prefix, obj)
|
||||
},
|
||||
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
||||
}
|
||||
storageInterface = storage.NewCacher(config)
|
||||
}
|
||||
newListFunc := func() runtime.Object { return &api.PodList{} }
|
||||
storageInterface := storageFactory(
|
||||
s, 1000, nil, &api.Pod{}, prefix, true, newListFunc)
|
||||
|
||||
store := &etcdgeneric.Etcd{
|
||||
NewFunc: func() runtime.Object { return &api.Pod{} },
|
||||
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
||||
NewListFunc: newListFunc,
|
||||
KeyRootFunc: func(ctx api.Context) string {
|
||||
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
|
||||
},
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/registry/registrytest"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/securitycontext"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
"k8s.io/kubernetes/pkg/tools/etcdtest"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
|
@ -38,7 +39,7 @@ import (
|
|||
|
||||
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient) {
|
||||
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
|
||||
storage := NewStorage(etcdStorage, false, nil, nil)
|
||||
storage := NewStorage(etcdStorage, storage.NoDecoration, nil, nil)
|
||||
return storage.Pod, storage.Binding, storage.Status, fakeClient
|
||||
}
|
||||
|
||||
|
|
|
@ -111,10 +111,63 @@ type Cacher struct {
|
|||
ListFromCache bool
|
||||
}
|
||||
|
||||
// StorageFactory is a function signature for producing
|
||||
// a storage.Interface from given parameters.
|
||||
type StorageFactory func(
|
||||
storage Interface,
|
||||
capacity int,
|
||||
versioner Versioner,
|
||||
objectType runtime.Object,
|
||||
resourcePrefix string,
|
||||
namespaceScoped bool,
|
||||
newListFunc func() runtime.Object) Interface
|
||||
|
||||
func NoDecoration(
|
||||
storage Interface,
|
||||
capacity int,
|
||||
versioner Versioner,
|
||||
objectType runtime.Object,
|
||||
resourcePrefix string,
|
||||
namespaceScoped bool,
|
||||
newListFunc func() runtime.Object) Interface {
|
||||
return storage
|
||||
}
|
||||
|
||||
// Create a new Cacher responsible from service WATCH and LIST requests from its
|
||||
// internal cache and updating its cache in the background based on the given
|
||||
// configuration.
|
||||
func NewCacher(config CacherConfig) *Cacher {
|
||||
func NewCacher(
|
||||
storage Interface,
|
||||
capacity int,
|
||||
versioner Versioner,
|
||||
objectType runtime.Object,
|
||||
resourcePrefix string,
|
||||
namespaceScoped bool,
|
||||
newListFunc func() runtime.Object) Interface {
|
||||
config := CacherConfig{
|
||||
CacheCapacity: capacity,
|
||||
Storage: storage,
|
||||
Versioner: versioner,
|
||||
Type: objectType,
|
||||
ResourcePrefix: resourcePrefix,
|
||||
NewListFunc: newListFunc,
|
||||
}
|
||||
if namespaceScoped {
|
||||
config.KeyFunc = func(obj runtime.Object) (string, error) {
|
||||
return NamespaceKeyFunc(resourcePrefix, obj)
|
||||
}
|
||||
} else {
|
||||
config.KeyFunc = func(obj runtime.Object) (string, error) {
|
||||
return NoNamespaceKeyFunc(resourcePrefix, obj)
|
||||
}
|
||||
}
|
||||
return NewCacherFromConfig(config)
|
||||
}
|
||||
|
||||
// Create a new Cacher responsible from service WATCH and LIST requests from its
|
||||
// internal cache and updating its cache in the background based on the given
|
||||
// configuration.
|
||||
func NewCacherFromConfig(config CacherConfig) *Cacher {
|
||||
watchCache := newWatchCache(config.CacheCapacity)
|
||||
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ func newTestCacher(client tools.EtcdClient) *storage.Cacher {
|
|||
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
||||
StopChannel: util.NeverStop,
|
||||
}
|
||||
return storage.NewCacher(config)
|
||||
return storage.NewCacherFromConfig(config)
|
||||
}
|
||||
|
||||
func makeTestPod(name string) *api.Pod {
|
||||
|
|
Loading…
Reference in New Issue