mirror of https://github.com/k3s-io/k3s
commit
b79379516f
|
@ -535,8 +535,14 @@ func (m *Master) init(c *Config) {
|
||||||
|
|
||||||
healthzChecks := []healthz.HealthzChecker{}
|
healthzChecks := []healthz.HealthzChecker{}
|
||||||
|
|
||||||
|
var storageFactory storage.StorageFactory
|
||||||
|
if c.EnableWatchCache {
|
||||||
|
storageFactory = storage.NewCacher
|
||||||
|
} else {
|
||||||
|
storageFactory = storage.NoDecoration
|
||||||
|
}
|
||||||
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
|
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
|
||||||
podStorage := podetcd.NewStorage(dbClient("pods"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
|
podStorage := podetcd.NewStorage(dbClient("pods"), storageFactory, c.KubeletClient, m.proxyTransport)
|
||||||
|
|
||||||
podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"))
|
podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"))
|
||||||
|
|
||||||
|
@ -552,10 +558,10 @@ func (m *Master) init(c *Config) {
|
||||||
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces"))
|
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces"))
|
||||||
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)
|
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)
|
||||||
|
|
||||||
endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), c.EnableWatchCache)
|
endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), storageFactory)
|
||||||
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)
|
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)
|
||||||
|
|
||||||
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
|
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), storageFactory, c.KubeletClient, m.proxyTransport)
|
||||||
m.nodeRegistry = node.NewRegistry(nodeStorage)
|
m.nodeRegistry = node.NewRegistry(nodeStorage)
|
||||||
|
|
||||||
serviceStorage := serviceetcd.NewREST(dbClient("services"))
|
serviceStorage := serviceetcd.NewREST(dbClient("services"))
|
||||||
|
|
|
@ -32,27 +32,16 @@ type REST struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewREST returns a RESTStorage object that will work against endpoints.
|
// NewREST returns a RESTStorage object that will work against endpoints.
|
||||||
func NewREST(s storage.Interface, useCacher bool) *REST {
|
func NewREST(s storage.Interface, storageFactory storage.StorageFactory) *REST {
|
||||||
prefix := "/services/endpoints"
|
prefix := "/services/endpoints"
|
||||||
|
|
||||||
storageInterface := s
|
newListFunc := func() runtime.Object { return &api.EndpointsList{} }
|
||||||
if useCacher {
|
storageInterface := storageFactory(
|
||||||
config := storage.CacherConfig{
|
s, 1000, nil, &api.Endpoints{}, prefix, true, newListFunc)
|
||||||
CacheCapacity: 1000,
|
|
||||||
Storage: s,
|
|
||||||
Type: &api.Endpoints{},
|
|
||||||
ResourcePrefix: prefix,
|
|
||||||
KeyFunc: func(obj runtime.Object) (string, error) {
|
|
||||||
return storage.NamespaceKeyFunc(prefix, obj)
|
|
||||||
},
|
|
||||||
NewListFunc: func() runtime.Object { return &api.EndpointsList{} },
|
|
||||||
}
|
|
||||||
storageInterface = storage.NewCacher(config)
|
|
||||||
}
|
|
||||||
|
|
||||||
store := &etcdgeneric.Etcd{
|
store := &etcdgeneric.Etcd{
|
||||||
NewFunc: func() runtime.Object { return &api.Endpoints{} },
|
NewFunc: func() runtime.Object { return &api.Endpoints{} },
|
||||||
NewListFunc: func() runtime.Object { return &api.EndpointsList{} },
|
NewListFunc: newListFunc,
|
||||||
KeyRootFunc: func(ctx api.Context) string {
|
KeyRootFunc: func(ctx api.Context) string {
|
||||||
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
|
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
|
||||||
},
|
},
|
||||||
|
|
|
@ -24,12 +24,13 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/registry/registrytest"
|
"k8s.io/kubernetes/pkg/registry/registrytest"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/storage"
|
||||||
"k8s.io/kubernetes/pkg/tools"
|
"k8s.io/kubernetes/pkg/tools"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
|
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
|
||||||
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
|
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
|
||||||
return NewREST(etcdStorage, false), fakeClient
|
return NewREST(etcdStorage, storage.NoDecoration), fakeClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func validNewEndpoints() *api.Endpoints {
|
func validNewEndpoints() *api.Endpoints {
|
||||||
|
|
|
@ -50,27 +50,16 @@ func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewREST returns a RESTStorage object that will work against nodes.
|
// NewREST returns a RESTStorage object that will work against nodes.
|
||||||
func NewREST(s storage.Interface, useCacher bool, connection client.ConnectionInfoGetter, proxyTransport http.RoundTripper) (*REST, *StatusREST) {
|
func NewREST(s storage.Interface, storageFactory storage.StorageFactory, connection client.ConnectionInfoGetter, proxyTransport http.RoundTripper) (*REST, *StatusREST) {
|
||||||
prefix := "/minions"
|
prefix := "/minions"
|
||||||
|
|
||||||
storageInterface := s
|
newListFunc := func() runtime.Object { return &api.NodeList{} }
|
||||||
if useCacher {
|
storageInterface := storageFactory(
|
||||||
config := storage.CacherConfig{
|
s, 1000, nil, &api.Node{}, prefix, false, newListFunc)
|
||||||
CacheCapacity: 1000,
|
|
||||||
Storage: s,
|
|
||||||
Type: &api.Node{},
|
|
||||||
ResourcePrefix: prefix,
|
|
||||||
KeyFunc: func(obj runtime.Object) (string, error) {
|
|
||||||
return storage.NoNamespaceKeyFunc(prefix, obj)
|
|
||||||
},
|
|
||||||
NewListFunc: func() runtime.Object { return &api.NodeList{} },
|
|
||||||
}
|
|
||||||
storageInterface = storage.NewCacher(config)
|
|
||||||
}
|
|
||||||
|
|
||||||
store := &etcdgeneric.Etcd{
|
store := &etcdgeneric.Etcd{
|
||||||
NewFunc: func() runtime.Object { return &api.Node{} },
|
NewFunc: func() runtime.Object { return &api.Node{} },
|
||||||
NewListFunc: func() runtime.Object { return &api.NodeList{} },
|
NewListFunc: newListFunc,
|
||||||
KeyRootFunc: func(ctx api.Context) string {
|
KeyRootFunc: func(ctx api.Context) string {
|
||||||
return prefix
|
return prefix
|
||||||
},
|
},
|
||||||
|
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/registry/registrytest"
|
"k8s.io/kubernetes/pkg/registry/registrytest"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/storage"
|
||||||
"k8s.io/kubernetes/pkg/tools"
|
"k8s.io/kubernetes/pkg/tools"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -38,7 +39,7 @@ func (fakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, ht
|
||||||
|
|
||||||
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
|
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
|
||||||
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
|
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
|
||||||
storage, _ := NewREST(etcdStorage, false, fakeConnectionInfoGetter{}, nil)
|
storage, _ := NewREST(etcdStorage, storage.NoDecoration, fakeConnectionInfoGetter{}, nil)
|
||||||
return storage, fakeClient
|
return storage, fakeClient
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -60,27 +60,16 @@ type REST struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStorage returns a RESTStorage object that will work against pods.
|
// NewStorage returns a RESTStorage object that will work against pods.
|
||||||
func NewStorage(s storage.Interface, useCacher bool, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
|
func NewStorage(s storage.Interface, storageFactory storage.StorageFactory, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
|
||||||
prefix := "/pods"
|
prefix := "/pods"
|
||||||
|
|
||||||
storageInterface := s
|
newListFunc := func() runtime.Object { return &api.PodList{} }
|
||||||
if useCacher {
|
storageInterface := storageFactory(
|
||||||
config := storage.CacherConfig{
|
s, 1000, nil, &api.Pod{}, prefix, true, newListFunc)
|
||||||
CacheCapacity: 1000,
|
|
||||||
Storage: s,
|
|
||||||
Type: &api.Pod{},
|
|
||||||
ResourcePrefix: prefix,
|
|
||||||
KeyFunc: func(obj runtime.Object) (string, error) {
|
|
||||||
return storage.NamespaceKeyFunc(prefix, obj)
|
|
||||||
},
|
|
||||||
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
|
||||||
}
|
|
||||||
storageInterface = storage.NewCacher(config)
|
|
||||||
}
|
|
||||||
|
|
||||||
store := &etcdgeneric.Etcd{
|
store := &etcdgeneric.Etcd{
|
||||||
NewFunc: func() runtime.Object { return &api.Pod{} },
|
NewFunc: func() runtime.Object { return &api.Pod{} },
|
||||||
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
NewListFunc: newListFunc,
|
||||||
KeyRootFunc: func(ctx api.Context) string {
|
KeyRootFunc: func(ctx api.Context) string {
|
||||||
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
|
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
|
||||||
},
|
},
|
||||||
|
|
|
@ -31,6 +31,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/registry/registrytest"
|
"k8s.io/kubernetes/pkg/registry/registrytest"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/securitycontext"
|
"k8s.io/kubernetes/pkg/securitycontext"
|
||||||
|
"k8s.io/kubernetes/pkg/storage"
|
||||||
"k8s.io/kubernetes/pkg/tools"
|
"k8s.io/kubernetes/pkg/tools"
|
||||||
"k8s.io/kubernetes/pkg/tools/etcdtest"
|
"k8s.io/kubernetes/pkg/tools/etcdtest"
|
||||||
"k8s.io/kubernetes/pkg/util"
|
"k8s.io/kubernetes/pkg/util"
|
||||||
|
@ -38,7 +39,7 @@ import (
|
||||||
|
|
||||||
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient) {
|
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient) {
|
||||||
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
|
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
|
||||||
storage := NewStorage(etcdStorage, false, nil, nil)
|
storage := NewStorage(etcdStorage, storage.NoDecoration, nil, nil)
|
||||||
return storage.Pod, storage.Binding, storage.Status, fakeClient
|
return storage.Pod, storage.Binding, storage.Status, fakeClient
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -111,10 +111,63 @@ type Cacher struct {
|
||||||
ListFromCache bool
|
ListFromCache bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StorageFactory is a function signature for producing
|
||||||
|
// a storage.Interface from given parameters.
|
||||||
|
type StorageFactory func(
|
||||||
|
storage Interface,
|
||||||
|
capacity int,
|
||||||
|
versioner Versioner,
|
||||||
|
objectType runtime.Object,
|
||||||
|
resourcePrefix string,
|
||||||
|
namespaceScoped bool,
|
||||||
|
newListFunc func() runtime.Object) Interface
|
||||||
|
|
||||||
|
func NoDecoration(
|
||||||
|
storage Interface,
|
||||||
|
capacity int,
|
||||||
|
versioner Versioner,
|
||||||
|
objectType runtime.Object,
|
||||||
|
resourcePrefix string,
|
||||||
|
namespaceScoped bool,
|
||||||
|
newListFunc func() runtime.Object) Interface {
|
||||||
|
return storage
|
||||||
|
}
|
||||||
|
|
||||||
// Create a new Cacher responsible from service WATCH and LIST requests from its
|
// Create a new Cacher responsible from service WATCH and LIST requests from its
|
||||||
// internal cache and updating its cache in the background based on the given
|
// internal cache and updating its cache in the background based on the given
|
||||||
// configuration.
|
// configuration.
|
||||||
func NewCacher(config CacherConfig) *Cacher {
|
func NewCacher(
|
||||||
|
storage Interface,
|
||||||
|
capacity int,
|
||||||
|
versioner Versioner,
|
||||||
|
objectType runtime.Object,
|
||||||
|
resourcePrefix string,
|
||||||
|
namespaceScoped bool,
|
||||||
|
newListFunc func() runtime.Object) Interface {
|
||||||
|
config := CacherConfig{
|
||||||
|
CacheCapacity: capacity,
|
||||||
|
Storage: storage,
|
||||||
|
Versioner: versioner,
|
||||||
|
Type: objectType,
|
||||||
|
ResourcePrefix: resourcePrefix,
|
||||||
|
NewListFunc: newListFunc,
|
||||||
|
}
|
||||||
|
if namespaceScoped {
|
||||||
|
config.KeyFunc = func(obj runtime.Object) (string, error) {
|
||||||
|
return NamespaceKeyFunc(resourcePrefix, obj)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
config.KeyFunc = func(obj runtime.Object) (string, error) {
|
||||||
|
return NoNamespaceKeyFunc(resourcePrefix, obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NewCacherFromConfig(config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new Cacher responsible from service WATCH and LIST requests from its
|
||||||
|
// internal cache and updating its cache in the background based on the given
|
||||||
|
// configuration.
|
||||||
|
func NewCacherFromConfig(config CacherConfig) *Cacher {
|
||||||
watchCache := newWatchCache(config.CacheCapacity)
|
watchCache := newWatchCache(config.CacheCapacity)
|
||||||
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ func newTestCacher(client tools.EtcdClient) *storage.Cacher {
|
||||||
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
||||||
StopChannel: util.NeverStop,
|
StopChannel: util.NeverStop,
|
||||||
}
|
}
|
||||||
return storage.NewCacher(config)
|
return storage.NewCacherFromConfig(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeTestPod(name string) *api.Pod {
|
func makeTestPod(name string) *api.Pod {
|
||||||
|
|
Loading…
Reference in New Issue