From 1c667e4fc5902b7054366c3ae499ce42d6750df0 Mon Sep 17 00:00:00 2001 From: deads2k Date: Mon, 26 Sep 2016 07:51:04 -0400 Subject: [PATCH] move core storage out of master.go --- pkg/master/master.go | 223 +++-------------- pkg/master/master_test.go | 115 ++------- pkg/registry/core/rest/storage_core.go | 226 ++++++++++++++++++ .../extensions/rest/thirdparty_controller.go | 2 +- 4 files changed, 274 insertions(+), 292 deletions(-) create mode 100644 pkg/registry/core/rest/storage_core.go diff --git a/pkg/master/master.go b/pkg/master/master.go index c9cf231dd1..f1737e2206 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -74,37 +74,14 @@ import ( autoscalingrest "k8s.io/kubernetes/pkg/registry/autoscaling/rest" batchrest "k8s.io/kubernetes/pkg/registry/batch/rest" certificatesrest "k8s.io/kubernetes/pkg/registry/certificates/rest" + corerest "k8s.io/kubernetes/pkg/registry/core/rest" extensionsrest "k8s.io/kubernetes/pkg/registry/extensions/rest" policyrest "k8s.io/kubernetes/pkg/registry/policy/rest" rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest" storagerest "k8s.io/kubernetes/pkg/registry/storage/rest" // direct etcd registry dependencies - "k8s.io/kubernetes/pkg/registry/core/componentstatus" - configmapetcd "k8s.io/kubernetes/pkg/registry/core/configmap/etcd" - controlleretcd "k8s.io/kubernetes/pkg/registry/core/controller/etcd" - "k8s.io/kubernetes/pkg/registry/core/endpoint" - endpointsetcd "k8s.io/kubernetes/pkg/registry/core/endpoint/etcd" - eventetcd "k8s.io/kubernetes/pkg/registry/core/event/etcd" - limitrangeetcd "k8s.io/kubernetes/pkg/registry/core/limitrange/etcd" - "k8s.io/kubernetes/pkg/registry/core/namespace" - namespaceetcd "k8s.io/kubernetes/pkg/registry/core/namespace/etcd" - "k8s.io/kubernetes/pkg/registry/core/node" - nodeetcd "k8s.io/kubernetes/pkg/registry/core/node/etcd" - pvetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolume/etcd" - pvcetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/etcd" podetcd "k8s.io/kubernetes/pkg/registry/core/pod/etcd" - podtemplateetcd "k8s.io/kubernetes/pkg/registry/core/podtemplate/etcd" - "k8s.io/kubernetes/pkg/registry/core/rangeallocation" - resourcequotaetcd "k8s.io/kubernetes/pkg/registry/core/resourcequota/etcd" - secretetcd "k8s.io/kubernetes/pkg/registry/core/secret/etcd" - "k8s.io/kubernetes/pkg/registry/core/service" - "k8s.io/kubernetes/pkg/registry/core/service/allocator" - etcdallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/etcd" - serviceetcd "k8s.io/kubernetes/pkg/registry/core/service/etcd" - ipallocator "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" - "k8s.io/kubernetes/pkg/registry/core/service/portallocator" - serviceaccountetcd "k8s.io/kubernetes/pkg/registry/core/serviceaccount/etcd" "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata" thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata/etcd" ) @@ -146,20 +123,11 @@ type EndpointReconcilerConfig struct { type Master struct { *genericapiserver.GenericAPIServer - // Map of v1 resources to their REST storages. - v1ResourcesStorage map[string]rest.Storage + legacyRESTStorageProvider corerest.LegacyRESTStorageProvider + legacyRESTStorage corerest.LegacyRESTStorage enableCoreControllers bool deleteCollectionWorkers int - // registries are internal client APIs for accessing the storage layer - // TODO: define the internal typed interface in a way that clients can - // also be replaced - nodeRegistry node.Registry - namespaceRegistry namespace.Registry - serviceRegistry service.Registry - endpointRegistry endpoint.Registry - serviceClusterIPAllocator rangeallocation.RangeRegistry - serviceNodePortAllocator rangeallocation.RangeRegistry // storage for third party objects thirdPartyStorageConfig *storagebackend.Config @@ -243,6 +211,16 @@ func (c completedConfig) New() (*Master, error) { enableGarbageCollection: c.GenericConfig.EnableGarbageCollection, storageFactory: c.StorageFactory, }, + + legacyRESTStorageProvider: corerest.LegacyRESTStorageProvider{ + StorageFactory: c.StorageFactory, + ProxyTransport: s.ProxyTransport, + KubeletClient: c.KubeletClient, + EventTTL: c.EventTTL, + ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange, + ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange, + ComponentStatusServerFunc: func() map[string]apiserver.Server { return getServersToValidate(c.StorageFactory) }, + }, } if c.EnableWatchCache { @@ -279,29 +257,20 @@ func (c completedConfig) New() (*Master, error) { } func (m *Master) InstallAPIs(c *Config) { + restOptionsGetter := func(resource unversioned.GroupResource) generic.RESTOptions { + return m.restOptionsFactory.NewFor(resource) + } + apiGroupsInfo := []genericapiserver.APIGroupInfo{} // Install v1 unless disabled. if c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) { - // Install v1 API. - m.initV1ResourcesStorage(c) - apiGroupInfo := genericapiserver.APIGroupInfo{ - GroupMeta: *registered.GroupOrDie(api.GroupName), - VersionedResourcesStorageMap: map[string]map[string]rest.Storage{ - "v1": m.v1ResourcesStorage, - }, - IsLegacyGroup: true, - Scheme: api.Scheme, - ParameterCodec: api.ParameterCodec, - NegotiatedSerializer: api.Codecs, - SubresourceGroupVersionKind: map[string]unversioned.GroupVersionKind{}, - } - if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) { - apiGroupInfo.SubresourceGroupVersionKind["replicationcontrollers/scale"] = autoscalingGroupVersion.WithKind("Scale") - } - if policyGroupVersion := (unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}); registered.IsEnabledVersion(policyGroupVersion) { - apiGroupInfo.SubresourceGroupVersionKind["pods/eviction"] = policyGroupVersion.WithKind("Eviction") + legacyRESTStorage, apiGroupInfo, err := m.legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter) + if err != nil { + glog.Fatalf("Error building core storage: %v", err) } + m.legacyRESTStorage = legacyRESTStorage + apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo) } @@ -334,10 +303,6 @@ func (m *Master) InstallAPIs(c *Config) { m.thirdPartyResources = map[string]*thirdPartyEntry{} } - restOptionsGetter := func(resource unversioned.GroupResource) generic.RESTOptions { - return m.restOptionsFactory.NewFor(resource) - } - // stabilize order. // TODO find a better way to configure priority of groups for _, group := range sets.StringKeySet(c.RESTStorageProviders).List() { @@ -386,133 +351,11 @@ func (m *Master) InstallAPIs(c *Config) { } } -func (m *Master) initV1ResourcesStorage(c *Config) { - restOptions := func(resource string) generic.RESTOptions { - return m.restOptionsFactory.NewFor(api.Resource(resource)) - } - - podTemplateStorage := podtemplateetcd.NewREST(restOptions("podTemplates")) - - eventStorage := eventetcd.NewREST(restOptions("events"), uint64(c.EventTTL.Seconds())) - limitRangeStorage := limitrangeetcd.NewREST(restOptions("limitRanges")) - - resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(restOptions("resourceQuotas")) - secretStorage := secretetcd.NewREST(restOptions("secrets")) - serviceAccountStorage := serviceaccountetcd.NewREST(restOptions("serviceAccounts")) - persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(restOptions("persistentVolumes")) - persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(restOptions("persistentVolumeClaims")) - configMapStorage := configmapetcd.NewREST(restOptions("configMaps")) - - namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(restOptions("namespaces")) - m.namespaceRegistry = namespace.NewRegistry(namespaceStorage) - - endpointsStorage := endpointsetcd.NewREST(restOptions("endpoints")) - m.endpointRegistry = endpoint.NewRegistry(endpointsStorage) - - nodeStorage := nodeetcd.NewStorage(restOptions("nodes"), c.KubeletClient, m.ProxyTransport) - m.nodeRegistry = node.NewRegistry(nodeStorage.Node) - - podStorage := podetcd.NewStorage( - restOptions("pods"), - kubeletclient.ConnectionInfoGetter(nodeStorage.Node), - m.ProxyTransport, - ) - - serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptions("services")) - m.serviceRegistry = service.NewRegistry(serviceRESTStorage) - - var serviceClusterIPRegistry rangeallocation.RangeRegistry - serviceClusterIPRange := m.ServiceClusterIPRange - if serviceClusterIPRange == nil { - glog.Fatalf("service clusterIPRange is nil") - return - } - - serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services")) - if err != nil { - glog.Fatal(err.Error()) - } - - serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface { - mem := allocator.NewAllocationMap(max, rangeSpec) - // TODO etcdallocator package to return a storage interface via the storageFactory - etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig) - serviceClusterIPRegistry = etcd - return etcd - }) - m.serviceClusterIPAllocator = serviceClusterIPRegistry - - var serviceNodePortRegistry rangeallocation.RangeRegistry - serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface { - mem := allocator.NewAllocationMap(max, rangeSpec) - // TODO etcdallocator package to return a storage interface via the storageFactory - etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig) - serviceNodePortRegistry = etcd - return etcd - }) - m.serviceNodePortAllocator = serviceNodePortRegistry - - controllerStorage := controlleretcd.NewStorage(restOptions("replicationControllers")) - - serviceRest := service.NewStorage(m.serviceRegistry, m.endpointRegistry, serviceClusterIPAllocator, serviceNodePortAllocator, m.ProxyTransport) - - // TODO: Factor out the core API registration - m.v1ResourcesStorage = map[string]rest.Storage{ - "pods": podStorage.Pod, - "pods/attach": podStorage.Attach, - "pods/status": podStorage.Status, - "pods/log": podStorage.Log, - "pods/exec": podStorage.Exec, - "pods/portforward": podStorage.PortForward, - "pods/proxy": podStorage.Proxy, - "pods/binding": podStorage.Binding, - "bindings": podStorage.Binding, - - "podTemplates": podTemplateStorage, - - "replicationControllers": controllerStorage.Controller, - "replicationControllers/status": controllerStorage.Status, - - "services": serviceRest.Service, - "services/proxy": serviceRest.Proxy, - "services/status": serviceStatusStorage, - - "endpoints": endpointsStorage, - - "nodes": nodeStorage.Node, - "nodes/status": nodeStorage.Status, - "nodes/proxy": nodeStorage.Proxy, - - "events": eventStorage, - - "limitRanges": limitRangeStorage, - "resourceQuotas": resourceQuotaStorage, - "resourceQuotas/status": resourceQuotaStatusStorage, - "namespaces": namespaceStorage, - "namespaces/status": namespaceStatusStorage, - "namespaces/finalize": namespaceFinalizeStorage, - "secrets": secretStorage, - "serviceAccounts": serviceAccountStorage, - "persistentVolumes": persistentVolumeStorage, - "persistentVolumes/status": persistentVolumeStatusStorage, - "persistentVolumeClaims": persistentVolumeClaimStorage, - "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage, - "configMaps": configMapStorage, - - "componentStatuses": componentstatus.NewStorage(func() map[string]apiserver.Server { return m.getServersToValidate(c) }), - } - if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) { - m.v1ResourcesStorage["replicationControllers/scale"] = controllerStorage.Scale - } - if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}) { - m.v1ResourcesStorage["pods/eviction"] = podStorage.Eviction - } -} - // NewBootstrapController returns a controller for watching the core capabilities of the master. If // endpointReconcilerConfig.Interval is 0, the default value of DefaultEndpointReconcilerInterval // will be used instead. If endpointReconcilerConfig.Reconciler is nil, the default // MasterCountEndpointReconciler will be used. +// TODO this should be kicked off as a server PostHook func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconcilerConfig) *Controller { if endpointReconcilerConfig.Interval == 0 { endpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval @@ -521,12 +364,12 @@ func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconci if endpointReconcilerConfig.Reconciler == nil { // use a default endpoint reconciler if nothing is set // m.endpointRegistry is set via m.InstallAPIs -> m.initV1ResourcesStorage - endpointReconcilerConfig.Reconciler = NewMasterCountEndpointReconciler(m.MasterCount, m.endpointRegistry) + endpointReconcilerConfig.Reconciler = NewMasterCountEndpointReconciler(m.MasterCount, m.legacyRESTStorage.EndpointRegistry) } return &Controller{ - NamespaceRegistry: m.namespaceRegistry, - ServiceRegistry: m.serviceRegistry, + NamespaceRegistry: m.legacyRESTStorage.NamespaceRegistry, + ServiceRegistry: m.legacyRESTStorage.ServiceRegistry, EndpointReconciler: endpointReconcilerConfig.Reconciler, EndpointInterval: endpointReconcilerConfig.Interval, @@ -534,12 +377,12 @@ func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconci SystemNamespaces: []string{api.NamespaceSystem}, SystemNamespacesInterval: 1 * time.Minute, - ServiceClusterIPRegistry: m.serviceClusterIPAllocator, - ServiceClusterIPRange: m.ServiceClusterIPRange, + ServiceClusterIPRegistry: m.legacyRESTStorage.ServiceClusterIPAllocator, + ServiceClusterIPRange: m.legacyRESTStorageProvider.ServiceClusterIPRange, ServiceClusterIPInterval: 3 * time.Minute, - ServiceNodePortRegistry: m.serviceNodePortAllocator, - ServiceNodePortRange: m.ServiceNodePortRange, + ServiceNodePortRegistry: m.legacyRESTStorage.ServiceNodePortAllocator, + ServiceNodePortRange: m.legacyRESTStorageProvider.ServiceNodePortRange, ServiceNodePortInterval: 3 * time.Minute, PublicIP: m.ClusterIP, @@ -553,13 +396,13 @@ func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconci } } -func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server { +func getServersToValidate(storageFactory genericapiserver.StorageFactory) map[string]apiserver.Server { serversToValidate := map[string]apiserver.Server{ "controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"}, "scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"}, } - for ix, machine := range c.StorageFactory.Backends() { + for ix, machine := range storageFactory.Backends() { etcdUrl, err := url.Parse(machine) if err != nil { glog.Errorf("Failed to parse etcd url for validation: %v", err) @@ -867,7 +710,7 @@ func findExternalAddress(node *api.Node) (string, error) { } func (m *Master) getNodeAddresses() ([]string, error) { - nodes, err := m.nodeRegistry.ListNodes(api.NewDefaultContext(), nil) + nodes, err := m.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil) if err != nil { return nil, err } diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 13395199cc..0d03b5e3b5 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -28,7 +28,6 @@ import ( "reflect" "strings" "testing" - "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/meta" @@ -47,12 +46,9 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/apis/rbac" - "k8s.io/kubernetes/pkg/apiserver/request" "k8s.io/kubernetes/pkg/generated/openapi" "k8s.io/kubernetes/pkg/genericapiserver" "k8s.io/kubernetes/pkg/kubelet/client" - "k8s.io/kubernetes/pkg/registry/core/endpoint" - "k8s.io/kubernetes/pkg/registry/core/namespace" ipallocator "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" extensionsrest "k8s.io/kubernetes/pkg/registry/extensions/rest" "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata" @@ -62,7 +58,6 @@ import ( "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" - "k8s.io/kubernetes/pkg/util/intstr" utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/version" @@ -82,7 +77,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. master := &Master{ GenericAPIServer: &genericapiserver.GenericAPIServer{}, } - config := Config{ + config := &Config{ GenericConfig: &genericapiserver.Config{}, } @@ -108,6 +103,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. config.GenericConfig.ProxyTLSClientConfig = &tls.Config{} config.GenericConfig.RequestContextMapper = api.NewRequestContextMapper() config.GenericConfig.EnableVersion = true + config.EnableCoreControllers = false // TODO: this is kind of hacky. The trouble is that the sync loop // runs in a go-routine and there is no way to validate in the test @@ -117,9 +113,14 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. // run the sync routine and register types manually. config.disableThirdPartyControllerForTesting = true - master.nodeRegistry = registrytest.NewNodeRegistry([]string{"node1", "node2"}, api.NodeResources{}) + master, err := config.Complete().New() + if err != nil { + t.Fatal(err) + } - return master, server, config, assert.New(t) + master.legacyRESTStorage.NodeRegistry = registrytest.NewNodeRegistry([]string{"node1", "node2"}, api.NodeResources{}) + + return master, server, *config, assert.New(t) } func newMaster(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) { @@ -187,26 +188,6 @@ func TestNew(t *testing.T) { assert.Equal(master.ProxyTransport.(*http.Transport).TLSClientConfig, config.GenericConfig.ProxyTLSClientConfig) } -// TestNamespaceSubresources ensures the namespace subresource parsing in apiserver/handlers.go doesn't drift -func TestNamespaceSubresources(t *testing.T) { - master, etcdserver, _, _ := newMaster(t) - defer etcdserver.Terminate(t) - - expectedSubresources := request.NamespaceSubResourcesForTest - foundSubresources := sets.NewString() - - for k := range master.v1ResourcesStorage { - parts := strings.Split(k, "/") - if len(parts) == 2 && parts[0] == "namespaces" { - foundSubresources.Insert(parts[1]) - } - } - - if !reflect.DeepEqual(expectedSubresources.List(), foundSubresources.List()) { - t.Errorf("Expected namespace subresources %#v, got %#v. Update apiserver/handlers.go#namespaceSubresources", expectedSubresources.List(), foundSubresources.List()) - } -} - // TestVersion tests /version func TestVersion(t *testing.T) { s, etcdserver, _, _ := newMaster(t) @@ -232,10 +213,10 @@ func TestVersion(t *testing.T) { // TestGetServersToValidate verifies the unexported getServersToValidate function func TestGetServersToValidate(t *testing.T) { - master, etcdserver, config, assert := setUp(t) + _, etcdserver, config, assert := setUp(t) defer etcdserver.Terminate(t) - servers := master.getServersToValidate(&config) + servers := getServersToValidate(config.StorageFactory) // Expected servers to validate: scheduler, controller-manager and etcd. assert.Equal(3, len(servers), "unexpected server list: %#v", servers) @@ -276,74 +257,6 @@ func (*fakeEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, return nil } -// TestNewBootstrapController verifies master fields are properly copied into controller -func TestNewBootstrapController(t *testing.T) { - // Tests a subset of inputs to ensure they are set properly in the controller - master, etcdserver, _, assert := setUp(t) - defer etcdserver.Terminate(t) - - portRange := utilnet.PortRange{Base: 10, Size: 10} - - master.namespaceRegistry = namespace.NewRegistry(nil) - master.serviceRegistry = registrytest.NewServiceRegistry() - master.endpointRegistry = endpoint.NewRegistry(nil) - - master.ServiceNodePortRange = portRange - master.MasterCount = 1 - master.ServiceReadWritePort = 1000 - master.PublicReadWritePort = 1010 - - // test with an empty EndpointReconcilerConfig to ensure the defaults are applied - controller := master.NewBootstrapController(EndpointReconcilerConfig{}) - - assert.Equal(controller.NamespaceRegistry, master.namespaceRegistry) - assert.Equal(controller.EndpointReconciler, NewMasterCountEndpointReconciler(master.MasterCount, master.endpointRegistry)) - assert.Equal(controller.EndpointInterval, DefaultEndpointReconcilerInterval) - assert.Equal(controller.ServiceRegistry, master.serviceRegistry) - assert.Equal(controller.ServiceNodePortRange, portRange) - assert.Equal(controller.ServicePort, master.ServiceReadWritePort) - assert.Equal(controller.PublicServicePort, master.PublicReadWritePort) - - // test with a filled-in EndpointReconcilerConfig to make sure its values are used - controller = master.NewBootstrapController(EndpointReconcilerConfig{ - Reconciler: &fakeEndpointReconciler{}, - Interval: 5 * time.Second, - }) - assert.Equal(controller.EndpointReconciler, &fakeEndpointReconciler{}) - assert.Equal(controller.EndpointInterval, 5*time.Second) -} - -// TestControllerServicePorts verifies master extraServicePorts are -// correctly copied into controller -func TestControllerServicePorts(t *testing.T) { - master, etcdserver, _, assert := setUp(t) - defer etcdserver.Terminate(t) - - master.namespaceRegistry = namespace.NewRegistry(nil) - master.serviceRegistry = registrytest.NewServiceRegistry() - master.endpointRegistry = endpoint.NewRegistry(nil) - - master.ExtraServicePorts = []api.ServicePort{ - { - Name: "additional-port-1", - Port: 1000, - Protocol: api.ProtocolTCP, - TargetPort: intstr.FromInt(1000), - }, - { - Name: "additional-port-2", - Port: 1010, - Protocol: api.ProtocolTCP, - TargetPort: intstr.FromInt(1010), - }, - } - - controller := master.NewBootstrapController(EndpointReconcilerConfig{}) - - assert.Equal(int32(1000), controller.ExtraServicePorts[0].Port) - assert.Equal(int32(1010), controller.ExtraServicePorts[1].Port) -} - // TestGetNodeAddresses verifies that proper results are returned // when requesting node addresses. func TestGetNodeAddresses(t *testing.T) { @@ -351,14 +264,14 @@ func TestGetNodeAddresses(t *testing.T) { defer etcdserver.Terminate(t) // Fail case (no addresses associated with nodes) - nodes, _ := master.nodeRegistry.ListNodes(api.NewDefaultContext(), nil) + nodes, _ := master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil) addrs, err := master.getNodeAddresses() assert.Error(err, "getNodeAddresses should have caused an error as there are no addresses.") assert.Equal([]string(nil), addrs) // Pass case with External type IP - nodes, _ = master.nodeRegistry.ListNodes(api.NewDefaultContext(), nil) + nodes, _ = master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil) for index := range nodes.Items { nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeExternalIP, Address: "127.0.0.1"}} } @@ -367,7 +280,7 @@ func TestGetNodeAddresses(t *testing.T) { assert.Equal([]string{"127.0.0.1", "127.0.0.1"}, addrs) // Pass case with LegacyHost type IP - nodes, _ = master.nodeRegistry.ListNodes(api.NewDefaultContext(), nil) + nodes, _ = master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil) for index := range nodes.Items { nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.2"}} } diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go new file mode 100644 index 0000000000..7cfaee0398 --- /dev/null +++ b/pkg/registry/core/rest/storage_core.go @@ -0,0 +1,226 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "fmt" + "net" + "net/http" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/rest" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apimachinery/registered" + "k8s.io/kubernetes/pkg/apiserver" + "k8s.io/kubernetes/pkg/genericapiserver" + kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" + "k8s.io/kubernetes/pkg/registry/core/componentstatus" + configmapetcd "k8s.io/kubernetes/pkg/registry/core/configmap/etcd" + controlleretcd "k8s.io/kubernetes/pkg/registry/core/controller/etcd" + "k8s.io/kubernetes/pkg/registry/core/endpoint" + endpointsetcd "k8s.io/kubernetes/pkg/registry/core/endpoint/etcd" + eventetcd "k8s.io/kubernetes/pkg/registry/core/event/etcd" + limitrangeetcd "k8s.io/kubernetes/pkg/registry/core/limitrange/etcd" + "k8s.io/kubernetes/pkg/registry/core/namespace" + namespaceetcd "k8s.io/kubernetes/pkg/registry/core/namespace/etcd" + "k8s.io/kubernetes/pkg/registry/core/node" + nodeetcd "k8s.io/kubernetes/pkg/registry/core/node/etcd" + pvetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolume/etcd" + pvcetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/etcd" + podetcd "k8s.io/kubernetes/pkg/registry/core/pod/etcd" + podtemplateetcd "k8s.io/kubernetes/pkg/registry/core/podtemplate/etcd" + "k8s.io/kubernetes/pkg/registry/core/rangeallocation" + resourcequotaetcd "k8s.io/kubernetes/pkg/registry/core/resourcequota/etcd" + secretetcd "k8s.io/kubernetes/pkg/registry/core/secret/etcd" + "k8s.io/kubernetes/pkg/registry/core/service" + "k8s.io/kubernetes/pkg/registry/core/service/allocator" + etcdallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/etcd" + serviceetcd "k8s.io/kubernetes/pkg/registry/core/service/etcd" + ipallocator "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" + "k8s.io/kubernetes/pkg/registry/core/service/portallocator" + serviceaccountetcd "k8s.io/kubernetes/pkg/registry/core/serviceaccount/etcd" + utilnet "k8s.io/kubernetes/pkg/util/net" +) + +// LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but +// does NOT implement the "normal" RESTStorageProvider (yet!) +type LegacyRESTStorageProvider struct { + StorageFactory genericapiserver.StorageFactory + // Used for custom proxy dialing, and proxy TLS options + ProxyTransport http.RoundTripper + KubeletClient kubeletclient.KubeletClient + EventTTL time.Duration + + // ServiceClusterIPRange is used to build cluster IPs for discovery. + ServiceClusterIPRange *net.IPNet + ServiceNodePortRange utilnet.PortRange + + // ComponentStatusServerFunc is a func used to locate servers to back component status + ComponentStatusServerFunc ComponentStatusServerFunc +} + +type ComponentStatusServerFunc func() map[string]apiserver.Server + +// LegacyRESTStorage returns stateful information about particular instances of REST storage to +// master.go for wiring controllers. +// TODO remove this by running the controller as a poststarthook +type LegacyRESTStorage struct { + NodeRegistry node.Registry + NamespaceRegistry namespace.Registry + ServiceRegistry service.Registry + EndpointRegistry endpoint.Registry + ServiceClusterIPAllocator rangeallocation.RangeRegistry + ServiceNodePortAllocator rangeallocation.RangeRegistry +} + +func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter genericapiserver.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) { + apiGroupInfo := genericapiserver.APIGroupInfo{ + GroupMeta: *registered.GroupOrDie(api.GroupName), + VersionedResourcesStorageMap: map[string]map[string]rest.Storage{}, + IsLegacyGroup: true, + Scheme: api.Scheme, + ParameterCodec: api.ParameterCodec, + NegotiatedSerializer: api.Codecs, + SubresourceGroupVersionKind: map[string]unversioned.GroupVersionKind{}, + } + if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) { + apiGroupInfo.SubresourceGroupVersionKind["replicationcontrollers/scale"] = autoscalingGroupVersion.WithKind("Scale") + } + if policyGroupVersion := (unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}); registered.IsEnabledVersion(policyGroupVersion) { + apiGroupInfo.SubresourceGroupVersionKind["pods/eviction"] = policyGroupVersion.WithKind("Eviction") + } + restStorage := LegacyRESTStorage{} + + podTemplateStorage := podtemplateetcd.NewREST(restOptionsGetter(api.Resource("podTemplates"))) + + eventStorage := eventetcd.NewREST(restOptionsGetter(api.Resource("events")), uint64(c.EventTTL.Seconds())) + limitRangeStorage := limitrangeetcd.NewREST(restOptionsGetter(api.Resource("limitRanges"))) + + resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(restOptionsGetter(api.Resource("resourceQuotas"))) + secretStorage := secretetcd.NewREST(restOptionsGetter(api.Resource("secrets"))) + serviceAccountStorage := serviceaccountetcd.NewREST(restOptionsGetter(api.Resource("serviceAccounts"))) + persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(restOptionsGetter(api.Resource("persistentVolumes"))) + persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(restOptionsGetter(api.Resource("persistentVolumeClaims"))) + configMapStorage := configmapetcd.NewREST(restOptionsGetter(api.Resource("configMaps"))) + + namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(restOptionsGetter(api.Resource("namespaces"))) + restStorage.NamespaceRegistry = namespace.NewRegistry(namespaceStorage) + + endpointsStorage := endpointsetcd.NewREST(restOptionsGetter(api.Resource("endpoints"))) + restStorage.EndpointRegistry = endpoint.NewRegistry(endpointsStorage) + + nodeStorage := nodeetcd.NewStorage(restOptionsGetter(api.Resource("nodes")), c.KubeletClient, c.ProxyTransport) + restStorage.NodeRegistry = node.NewRegistry(nodeStorage.Node) + + podStorage := podetcd.NewStorage( + restOptionsGetter(api.Resource("pods")), + kubeletclient.ConnectionInfoGetter(nodeStorage.Node), + c.ProxyTransport, + ) + + serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptionsGetter(api.Resource("services"))) + restStorage.ServiceRegistry = service.NewRegistry(serviceRESTStorage) + + var serviceClusterIPRegistry rangeallocation.RangeRegistry + serviceClusterIPRange := c.ServiceClusterIPRange + if serviceClusterIPRange == nil { + return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("service clusterIPRange is nil") + } + + serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services")) + if err != nil { + return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err + } + + ServiceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface { + mem := allocator.NewAllocationMap(max, rangeSpec) + // TODO etcdallocator package to return a storage interface via the storageFactory + etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig) + serviceClusterIPRegistry = etcd + return etcd + }) + restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry + + var serviceNodePortRegistry rangeallocation.RangeRegistry + ServiceNodePortAllocator := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface { + mem := allocator.NewAllocationMap(max, rangeSpec) + // TODO etcdallocator package to return a storage interface via the storageFactory + etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig) + serviceNodePortRegistry = etcd + return etcd + }) + restStorage.ServiceNodePortAllocator = serviceNodePortRegistry + + controllerStorage := controlleretcd.NewStorage(restOptionsGetter(api.Resource("replicationControllers"))) + + serviceRest := service.NewStorage(restStorage.ServiceRegistry, restStorage.EndpointRegistry, ServiceClusterIPAllocator, ServiceNodePortAllocator, c.ProxyTransport) + + restStorageMap := map[string]rest.Storage{ + "pods": podStorage.Pod, + "pods/attach": podStorage.Attach, + "pods/status": podStorage.Status, + "pods/log": podStorage.Log, + "pods/exec": podStorage.Exec, + "pods/portforward": podStorage.PortForward, + "pods/proxy": podStorage.Proxy, + "pods/binding": podStorage.Binding, + "bindings": podStorage.Binding, + + "podTemplates": podTemplateStorage, + + "replicationControllers": controllerStorage.Controller, + "replicationControllers/status": controllerStorage.Status, + + "services": serviceRest.Service, + "services/proxy": serviceRest.Proxy, + "services/status": serviceStatusStorage, + + "endpoints": endpointsStorage, + + "nodes": nodeStorage.Node, + "nodes/status": nodeStorage.Status, + "nodes/proxy": nodeStorage.Proxy, + + "events": eventStorage, + + "limitRanges": limitRangeStorage, + "resourceQuotas": resourceQuotaStorage, + "resourceQuotas/status": resourceQuotaStatusStorage, + "namespaces": namespaceStorage, + "namespaces/status": namespaceStatusStorage, + "namespaces/finalize": namespaceFinalizeStorage, + "secrets": secretStorage, + "serviceAccounts": serviceAccountStorage, + "persistentVolumes": persistentVolumeStorage, + "persistentVolumes/status": persistentVolumeStatusStorage, + "persistentVolumeClaims": persistentVolumeClaimStorage, + "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage, + "configMaps": configMapStorage, + + "componentStatuses": componentstatus.NewStorage(c.ComponentStatusServerFunc), + } + if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) { + restStorageMap["replicationControllers/scale"] = controllerStorage.Scale + } + if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}) { + restStorageMap["pods/eviction"] = podStorage.Eviction + } + apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap + + return restStorage, apiGroupInfo, nil +} diff --git a/pkg/registry/extensions/rest/thirdparty_controller.go b/pkg/registry/extensions/rest/thirdparty_controller.go index f4aed6c36d..490fcc21d0 100644 --- a/pkg/registry/extensions/rest/thirdparty_controller.go +++ b/pkg/registry/extensions/rest/thirdparty_controller.go @@ -51,7 +51,7 @@ type ThirdPartyController struct { thirdPartyResourceRegistry *thirdpartyresourceetcd.REST } -// Synchronize a single resource with RESTful resources on the master +// SyncOneResource synchronizes a single resource with RESTful resources on the master func (t *ThirdPartyController) SyncOneResource(rsrc *extensions.ThirdPartyResource) error { // TODO: we also need to test if the existing installed resource matches the resource we are sync-ing. // Currently, if there is an older, incompatible resource installed, we won't remove it. We should detect