move core storage out of master.go

pull/6/head
deads2k 2016-09-26 07:51:04 -04:00
parent 24031f50d6
commit 1c667e4fc5
4 changed files with 274 additions and 292 deletions

View File

@ -74,37 +74,14 @@ import (
autoscalingrest "k8s.io/kubernetes/pkg/registry/autoscaling/rest"
batchrest "k8s.io/kubernetes/pkg/registry/batch/rest"
certificatesrest "k8s.io/kubernetes/pkg/registry/certificates/rest"
corerest "k8s.io/kubernetes/pkg/registry/core/rest"
extensionsrest "k8s.io/kubernetes/pkg/registry/extensions/rest"
policyrest "k8s.io/kubernetes/pkg/registry/policy/rest"
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
storagerest "k8s.io/kubernetes/pkg/registry/storage/rest"
// direct etcd registry dependencies
"k8s.io/kubernetes/pkg/registry/core/componentstatus"
configmapetcd "k8s.io/kubernetes/pkg/registry/core/configmap/etcd"
controlleretcd "k8s.io/kubernetes/pkg/registry/core/controller/etcd"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
endpointsetcd "k8s.io/kubernetes/pkg/registry/core/endpoint/etcd"
eventetcd "k8s.io/kubernetes/pkg/registry/core/event/etcd"
limitrangeetcd "k8s.io/kubernetes/pkg/registry/core/limitrange/etcd"
"k8s.io/kubernetes/pkg/registry/core/namespace"
namespaceetcd "k8s.io/kubernetes/pkg/registry/core/namespace/etcd"
"k8s.io/kubernetes/pkg/registry/core/node"
nodeetcd "k8s.io/kubernetes/pkg/registry/core/node/etcd"
pvetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolume/etcd"
pvcetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/etcd"
podetcd "k8s.io/kubernetes/pkg/registry/core/pod/etcd"
podtemplateetcd "k8s.io/kubernetes/pkg/registry/core/podtemplate/etcd"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
resourcequotaetcd "k8s.io/kubernetes/pkg/registry/core/resourcequota/etcd"
secretetcd "k8s.io/kubernetes/pkg/registry/core/secret/etcd"
"k8s.io/kubernetes/pkg/registry/core/service"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
etcdallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/etcd"
serviceetcd "k8s.io/kubernetes/pkg/registry/core/service/etcd"
ipallocator "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
serviceaccountetcd "k8s.io/kubernetes/pkg/registry/core/serviceaccount/etcd"
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata/etcd"
)
@ -146,20 +123,11 @@ type EndpointReconcilerConfig struct {
type Master struct {
*genericapiserver.GenericAPIServer
// Map of v1 resources to their REST storages.
v1ResourcesStorage map[string]rest.Storage
legacyRESTStorageProvider corerest.LegacyRESTStorageProvider
legacyRESTStorage corerest.LegacyRESTStorage
enableCoreControllers bool
deleteCollectionWorkers int
// registries are internal client APIs for accessing the storage layer
// TODO: define the internal typed interface in a way that clients can
// also be replaced
nodeRegistry node.Registry
namespaceRegistry namespace.Registry
serviceRegistry service.Registry
endpointRegistry endpoint.Registry
serviceClusterIPAllocator rangeallocation.RangeRegistry
serviceNodePortAllocator rangeallocation.RangeRegistry
// storage for third party objects
thirdPartyStorageConfig *storagebackend.Config
@ -243,6 +211,16 @@ func (c completedConfig) New() (*Master, error) {
enableGarbageCollection: c.GenericConfig.EnableGarbageCollection,
storageFactory: c.StorageFactory,
},
legacyRESTStorageProvider: corerest.LegacyRESTStorageProvider{
StorageFactory: c.StorageFactory,
ProxyTransport: s.ProxyTransport,
KubeletClient: c.KubeletClient,
EventTTL: c.EventTTL,
ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange,
ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange,
ComponentStatusServerFunc: func() map[string]apiserver.Server { return getServersToValidate(c.StorageFactory) },
},
}
if c.EnableWatchCache {
@ -279,29 +257,20 @@ func (c completedConfig) New() (*Master, error) {
}
func (m *Master) InstallAPIs(c *Config) {
restOptionsGetter := func(resource unversioned.GroupResource) generic.RESTOptions {
return m.restOptionsFactory.NewFor(resource)
}
apiGroupsInfo := []genericapiserver.APIGroupInfo{}
// Install v1 unless disabled.
if c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
// Install v1 API.
m.initV1ResourcesStorage(c)
apiGroupInfo := genericapiserver.APIGroupInfo{
GroupMeta: *registered.GroupOrDie(api.GroupName),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
"v1": m.v1ResourcesStorage,
},
IsLegacyGroup: true,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
SubresourceGroupVersionKind: map[string]unversioned.GroupVersionKind{},
}
if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) {
apiGroupInfo.SubresourceGroupVersionKind["replicationcontrollers/scale"] = autoscalingGroupVersion.WithKind("Scale")
}
if policyGroupVersion := (unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}); registered.IsEnabledVersion(policyGroupVersion) {
apiGroupInfo.SubresourceGroupVersionKind["pods/eviction"] = policyGroupVersion.WithKind("Eviction")
legacyRESTStorage, apiGroupInfo, err := m.legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
glog.Fatalf("Error building core storage: %v", err)
}
m.legacyRESTStorage = legacyRESTStorage
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
}
@ -334,10 +303,6 @@ func (m *Master) InstallAPIs(c *Config) {
m.thirdPartyResources = map[string]*thirdPartyEntry{}
}
restOptionsGetter := func(resource unversioned.GroupResource) generic.RESTOptions {
return m.restOptionsFactory.NewFor(resource)
}
// stabilize order.
// TODO find a better way to configure priority of groups
for _, group := range sets.StringKeySet(c.RESTStorageProviders).List() {
@ -386,133 +351,11 @@ func (m *Master) InstallAPIs(c *Config) {
}
}
func (m *Master) initV1ResourcesStorage(c *Config) {
restOptions := func(resource string) generic.RESTOptions {
return m.restOptionsFactory.NewFor(api.Resource(resource))
}
podTemplateStorage := podtemplateetcd.NewREST(restOptions("podTemplates"))
eventStorage := eventetcd.NewREST(restOptions("events"), uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(restOptions("limitRanges"))
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(restOptions("resourceQuotas"))
secretStorage := secretetcd.NewREST(restOptions("secrets"))
serviceAccountStorage := serviceaccountetcd.NewREST(restOptions("serviceAccounts"))
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(restOptions("persistentVolumes"))
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(restOptions("persistentVolumeClaims"))
configMapStorage := configmapetcd.NewREST(restOptions("configMaps"))
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(restOptions("namespaces"))
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)
endpointsStorage := endpointsetcd.NewREST(restOptions("endpoints"))
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)
nodeStorage := nodeetcd.NewStorage(restOptions("nodes"), c.KubeletClient, m.ProxyTransport)
m.nodeRegistry = node.NewRegistry(nodeStorage.Node)
podStorage := podetcd.NewStorage(
restOptions("pods"),
kubeletclient.ConnectionInfoGetter(nodeStorage.Node),
m.ProxyTransport,
)
serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptions("services"))
m.serviceRegistry = service.NewRegistry(serviceRESTStorage)
var serviceClusterIPRegistry rangeallocation.RangeRegistry
serviceClusterIPRange := m.ServiceClusterIPRange
if serviceClusterIPRange == nil {
glog.Fatalf("service clusterIPRange is nil")
return
}
serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
if err != nil {
glog.Fatal(err.Error())
}
serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
serviceClusterIPRegistry = etcd
return etcd
})
m.serviceClusterIPAllocator = serviceClusterIPRegistry
var serviceNodePortRegistry rangeallocation.RangeRegistry
serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
serviceNodePortRegistry = etcd
return etcd
})
m.serviceNodePortAllocator = serviceNodePortRegistry
controllerStorage := controlleretcd.NewStorage(restOptions("replicationControllers"))
serviceRest := service.NewStorage(m.serviceRegistry, m.endpointRegistry, serviceClusterIPAllocator, serviceNodePortAllocator, m.ProxyTransport)
// TODO: Factor out the core API registration
m.v1ResourcesStorage = map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
"podTemplates": podTemplateStorage,
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest.Service,
"services/proxy": serviceRest.Proxy,
"services/status": serviceStatusStorage,
"endpoints": endpointsStorage,
"nodes": nodeStorage.Node,
"nodes/status": nodeStorage.Status,
"nodes/proxy": nodeStorage.Proxy,
"events": eventStorage,
"limitRanges": limitRangeStorage,
"resourceQuotas": resourceQuotaStorage,
"resourceQuotas/status": resourceQuotaStatusStorage,
"namespaces": namespaceStorage,
"namespaces/status": namespaceStatusStorage,
"namespaces/finalize": namespaceFinalizeStorage,
"secrets": secretStorage,
"serviceAccounts": serviceAccountStorage,
"persistentVolumes": persistentVolumeStorage,
"persistentVolumes/status": persistentVolumeStatusStorage,
"persistentVolumeClaims": persistentVolumeClaimStorage,
"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
"configMaps": configMapStorage,
"componentStatuses": componentstatus.NewStorage(func() map[string]apiserver.Server { return m.getServersToValidate(c) }),
}
if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) {
m.v1ResourcesStorage["replicationControllers/scale"] = controllerStorage.Scale
}
if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}) {
m.v1ResourcesStorage["pods/eviction"] = podStorage.Eviction
}
}
// NewBootstrapController returns a controller for watching the core capabilities of the master. If
// endpointReconcilerConfig.Interval is 0, the default value of DefaultEndpointReconcilerInterval
// will be used instead. If endpointReconcilerConfig.Reconciler is nil, the default
// MasterCountEndpointReconciler will be used.
// TODO this should be kicked off as a server PostHook
func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconcilerConfig) *Controller {
if endpointReconcilerConfig.Interval == 0 {
endpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
@ -521,12 +364,12 @@ func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconci
if endpointReconcilerConfig.Reconciler == nil {
// use a default endpoint reconciler if nothing is set
// m.endpointRegistry is set via m.InstallAPIs -> m.initV1ResourcesStorage
endpointReconcilerConfig.Reconciler = NewMasterCountEndpointReconciler(m.MasterCount, m.endpointRegistry)
endpointReconcilerConfig.Reconciler = NewMasterCountEndpointReconciler(m.MasterCount, m.legacyRESTStorage.EndpointRegistry)
}
return &Controller{
NamespaceRegistry: m.namespaceRegistry,
ServiceRegistry: m.serviceRegistry,
NamespaceRegistry: m.legacyRESTStorage.NamespaceRegistry,
ServiceRegistry: m.legacyRESTStorage.ServiceRegistry,
EndpointReconciler: endpointReconcilerConfig.Reconciler,
EndpointInterval: endpointReconcilerConfig.Interval,
@ -534,12 +377,12 @@ func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconci
SystemNamespaces: []string{api.NamespaceSystem},
SystemNamespacesInterval: 1 * time.Minute,
ServiceClusterIPRegistry: m.serviceClusterIPAllocator,
ServiceClusterIPRange: m.ServiceClusterIPRange,
ServiceClusterIPRegistry: m.legacyRESTStorage.ServiceClusterIPAllocator,
ServiceClusterIPRange: m.legacyRESTStorageProvider.ServiceClusterIPRange,
ServiceClusterIPInterval: 3 * time.Minute,
ServiceNodePortRegistry: m.serviceNodePortAllocator,
ServiceNodePortRange: m.ServiceNodePortRange,
ServiceNodePortRegistry: m.legacyRESTStorage.ServiceNodePortAllocator,
ServiceNodePortRange: m.legacyRESTStorageProvider.ServiceNodePortRange,
ServiceNodePortInterval: 3 * time.Minute,
PublicIP: m.ClusterIP,
@ -553,13 +396,13 @@ func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconci
}
}
func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
func getServersToValidate(storageFactory genericapiserver.StorageFactory) map[string]apiserver.Server {
serversToValidate := map[string]apiserver.Server{
"controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"},
"scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
}
for ix, machine := range c.StorageFactory.Backends() {
for ix, machine := range storageFactory.Backends() {
etcdUrl, err := url.Parse(machine)
if err != nil {
glog.Errorf("Failed to parse etcd url for validation: %v", err)
@ -867,7 +710,7 @@ func findExternalAddress(node *api.Node) (string, error) {
}
func (m *Master) getNodeAddresses() ([]string, error) {
nodes, err := m.nodeRegistry.ListNodes(api.NewDefaultContext(), nil)
nodes, err := m.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil)
if err != nil {
return nil, err
}

View File

@ -28,7 +28,6 @@ import (
"reflect"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
@ -47,12 +46,9 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/apis/rbac"
"k8s.io/kubernetes/pkg/apiserver/request"
"k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
"k8s.io/kubernetes/pkg/registry/core/namespace"
ipallocator "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
extensionsrest "k8s.io/kubernetes/pkg/registry/extensions/rest"
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
@ -62,7 +58,6 @@ import (
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/util/intstr"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/version"
@ -82,7 +77,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
master := &Master{
GenericAPIServer: &genericapiserver.GenericAPIServer{},
}
config := Config{
config := &Config{
GenericConfig: &genericapiserver.Config{},
}
@ -108,6 +103,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
config.GenericConfig.ProxyTLSClientConfig = &tls.Config{}
config.GenericConfig.RequestContextMapper = api.NewRequestContextMapper()
config.GenericConfig.EnableVersion = true
config.EnableCoreControllers = false
// TODO: this is kind of hacky. The trouble is that the sync loop
// runs in a go-routine and there is no way to validate in the test
@ -117,9 +113,14 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
// run the sync routine and register types manually.
config.disableThirdPartyControllerForTesting = true
master.nodeRegistry = registrytest.NewNodeRegistry([]string{"node1", "node2"}, api.NodeResources{})
master, err := config.Complete().New()
if err != nil {
t.Fatal(err)
}
return master, server, config, assert.New(t)
master.legacyRESTStorage.NodeRegistry = registrytest.NewNodeRegistry([]string{"node1", "node2"}, api.NodeResources{})
return master, server, *config, assert.New(t)
}
func newMaster(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
@ -187,26 +188,6 @@ func TestNew(t *testing.T) {
assert.Equal(master.ProxyTransport.(*http.Transport).TLSClientConfig, config.GenericConfig.ProxyTLSClientConfig)
}
// TestNamespaceSubresources ensures the namespace subresource parsing in apiserver/handlers.go doesn't drift
func TestNamespaceSubresources(t *testing.T) {
master, etcdserver, _, _ := newMaster(t)
defer etcdserver.Terminate(t)
expectedSubresources := request.NamespaceSubResourcesForTest
foundSubresources := sets.NewString()
for k := range master.v1ResourcesStorage {
parts := strings.Split(k, "/")
if len(parts) == 2 && parts[0] == "namespaces" {
foundSubresources.Insert(parts[1])
}
}
if !reflect.DeepEqual(expectedSubresources.List(), foundSubresources.List()) {
t.Errorf("Expected namespace subresources %#v, got %#v. Update apiserver/handlers.go#namespaceSubresources", expectedSubresources.List(), foundSubresources.List())
}
}
// TestVersion tests /version
func TestVersion(t *testing.T) {
s, etcdserver, _, _ := newMaster(t)
@ -232,10 +213,10 @@ func TestVersion(t *testing.T) {
// TestGetServersToValidate verifies the unexported getServersToValidate function
func TestGetServersToValidate(t *testing.T) {
master, etcdserver, config, assert := setUp(t)
_, etcdserver, config, assert := setUp(t)
defer etcdserver.Terminate(t)
servers := master.getServersToValidate(&config)
servers := getServersToValidate(config.StorageFactory)
// Expected servers to validate: scheduler, controller-manager and etcd.
assert.Equal(3, len(servers), "unexpected server list: %#v", servers)
@ -276,74 +257,6 @@ func (*fakeEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP,
return nil
}
// TestNewBootstrapController verifies master fields are properly copied into controller
func TestNewBootstrapController(t *testing.T) {
// Tests a subset of inputs to ensure they are set properly in the controller
master, etcdserver, _, assert := setUp(t)
defer etcdserver.Terminate(t)
portRange := utilnet.PortRange{Base: 10, Size: 10}
master.namespaceRegistry = namespace.NewRegistry(nil)
master.serviceRegistry = registrytest.NewServiceRegistry()
master.endpointRegistry = endpoint.NewRegistry(nil)
master.ServiceNodePortRange = portRange
master.MasterCount = 1
master.ServiceReadWritePort = 1000
master.PublicReadWritePort = 1010
// test with an empty EndpointReconcilerConfig to ensure the defaults are applied
controller := master.NewBootstrapController(EndpointReconcilerConfig{})
assert.Equal(controller.NamespaceRegistry, master.namespaceRegistry)
assert.Equal(controller.EndpointReconciler, NewMasterCountEndpointReconciler(master.MasterCount, master.endpointRegistry))
assert.Equal(controller.EndpointInterval, DefaultEndpointReconcilerInterval)
assert.Equal(controller.ServiceRegistry, master.serviceRegistry)
assert.Equal(controller.ServiceNodePortRange, portRange)
assert.Equal(controller.ServicePort, master.ServiceReadWritePort)
assert.Equal(controller.PublicServicePort, master.PublicReadWritePort)
// test with a filled-in EndpointReconcilerConfig to make sure its values are used
controller = master.NewBootstrapController(EndpointReconcilerConfig{
Reconciler: &fakeEndpointReconciler{},
Interval: 5 * time.Second,
})
assert.Equal(controller.EndpointReconciler, &fakeEndpointReconciler{})
assert.Equal(controller.EndpointInterval, 5*time.Second)
}
// TestControllerServicePorts verifies master extraServicePorts are
// correctly copied into controller
func TestControllerServicePorts(t *testing.T) {
master, etcdserver, _, assert := setUp(t)
defer etcdserver.Terminate(t)
master.namespaceRegistry = namespace.NewRegistry(nil)
master.serviceRegistry = registrytest.NewServiceRegistry()
master.endpointRegistry = endpoint.NewRegistry(nil)
master.ExtraServicePorts = []api.ServicePort{
{
Name: "additional-port-1",
Port: 1000,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(1000),
},
{
Name: "additional-port-2",
Port: 1010,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(1010),
},
}
controller := master.NewBootstrapController(EndpointReconcilerConfig{})
assert.Equal(int32(1000), controller.ExtraServicePorts[0].Port)
assert.Equal(int32(1010), controller.ExtraServicePorts[1].Port)
}
// TestGetNodeAddresses verifies that proper results are returned
// when requesting node addresses.
func TestGetNodeAddresses(t *testing.T) {
@ -351,14 +264,14 @@ func TestGetNodeAddresses(t *testing.T) {
defer etcdserver.Terminate(t)
// Fail case (no addresses associated with nodes)
nodes, _ := master.nodeRegistry.ListNodes(api.NewDefaultContext(), nil)
nodes, _ := master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil)
addrs, err := master.getNodeAddresses()
assert.Error(err, "getNodeAddresses should have caused an error as there are no addresses.")
assert.Equal([]string(nil), addrs)
// Pass case with External type IP
nodes, _ = master.nodeRegistry.ListNodes(api.NewDefaultContext(), nil)
nodes, _ = master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil)
for index := range nodes.Items {
nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeExternalIP, Address: "127.0.0.1"}}
}
@ -367,7 +280,7 @@ func TestGetNodeAddresses(t *testing.T) {
assert.Equal([]string{"127.0.0.1", "127.0.0.1"}, addrs)
// Pass case with LegacyHost type IP
nodes, _ = master.nodeRegistry.ListNodes(api.NewDefaultContext(), nil)
nodes, _ = master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil)
for index := range nodes.Items {
nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.2"}}
}

View File

@ -0,0 +1,226 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"fmt"
"net"
"net/http"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/genericapiserver"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/registry/core/componentstatus"
configmapetcd "k8s.io/kubernetes/pkg/registry/core/configmap/etcd"
controlleretcd "k8s.io/kubernetes/pkg/registry/core/controller/etcd"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
endpointsetcd "k8s.io/kubernetes/pkg/registry/core/endpoint/etcd"
eventetcd "k8s.io/kubernetes/pkg/registry/core/event/etcd"
limitrangeetcd "k8s.io/kubernetes/pkg/registry/core/limitrange/etcd"
"k8s.io/kubernetes/pkg/registry/core/namespace"
namespaceetcd "k8s.io/kubernetes/pkg/registry/core/namespace/etcd"
"k8s.io/kubernetes/pkg/registry/core/node"
nodeetcd "k8s.io/kubernetes/pkg/registry/core/node/etcd"
pvetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolume/etcd"
pvcetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/etcd"
podetcd "k8s.io/kubernetes/pkg/registry/core/pod/etcd"
podtemplateetcd "k8s.io/kubernetes/pkg/registry/core/podtemplate/etcd"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
resourcequotaetcd "k8s.io/kubernetes/pkg/registry/core/resourcequota/etcd"
secretetcd "k8s.io/kubernetes/pkg/registry/core/secret/etcd"
"k8s.io/kubernetes/pkg/registry/core/service"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
etcdallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/etcd"
serviceetcd "k8s.io/kubernetes/pkg/registry/core/service/etcd"
ipallocator "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
serviceaccountetcd "k8s.io/kubernetes/pkg/registry/core/serviceaccount/etcd"
utilnet "k8s.io/kubernetes/pkg/util/net"
)
// LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but
// does NOT implement the "normal" RESTStorageProvider (yet!)
type LegacyRESTStorageProvider struct {
StorageFactory genericapiserver.StorageFactory
// Used for custom proxy dialing, and proxy TLS options
ProxyTransport http.RoundTripper
KubeletClient kubeletclient.KubeletClient
EventTTL time.Duration
// ServiceClusterIPRange is used to build cluster IPs for discovery.
ServiceClusterIPRange *net.IPNet
ServiceNodePortRange utilnet.PortRange
// ComponentStatusServerFunc is a func used to locate servers to back component status
ComponentStatusServerFunc ComponentStatusServerFunc
}
type ComponentStatusServerFunc func() map[string]apiserver.Server
// LegacyRESTStorage returns stateful information about particular instances of REST storage to
// master.go for wiring controllers.
// TODO remove this by running the controller as a poststarthook
type LegacyRESTStorage struct {
NodeRegistry node.Registry
NamespaceRegistry namespace.Registry
ServiceRegistry service.Registry
EndpointRegistry endpoint.Registry
ServiceClusterIPAllocator rangeallocation.RangeRegistry
ServiceNodePortAllocator rangeallocation.RangeRegistry
}
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter genericapiserver.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.APIGroupInfo{
GroupMeta: *registered.GroupOrDie(api.GroupName),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
IsLegacyGroup: true,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
SubresourceGroupVersionKind: map[string]unversioned.GroupVersionKind{},
}
if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) {
apiGroupInfo.SubresourceGroupVersionKind["replicationcontrollers/scale"] = autoscalingGroupVersion.WithKind("Scale")
}
if policyGroupVersion := (unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}); registered.IsEnabledVersion(policyGroupVersion) {
apiGroupInfo.SubresourceGroupVersionKind["pods/eviction"] = policyGroupVersion.WithKind("Eviction")
}
restStorage := LegacyRESTStorage{}
podTemplateStorage := podtemplateetcd.NewREST(restOptionsGetter(api.Resource("podTemplates")))
eventStorage := eventetcd.NewREST(restOptionsGetter(api.Resource("events")), uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(restOptionsGetter(api.Resource("limitRanges")))
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(restOptionsGetter(api.Resource("resourceQuotas")))
secretStorage := secretetcd.NewREST(restOptionsGetter(api.Resource("secrets")))
serviceAccountStorage := serviceaccountetcd.NewREST(restOptionsGetter(api.Resource("serviceAccounts")))
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(restOptionsGetter(api.Resource("persistentVolumes")))
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(restOptionsGetter(api.Resource("persistentVolumeClaims")))
configMapStorage := configmapetcd.NewREST(restOptionsGetter(api.Resource("configMaps")))
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(restOptionsGetter(api.Resource("namespaces")))
restStorage.NamespaceRegistry = namespace.NewRegistry(namespaceStorage)
endpointsStorage := endpointsetcd.NewREST(restOptionsGetter(api.Resource("endpoints")))
restStorage.EndpointRegistry = endpoint.NewRegistry(endpointsStorage)
nodeStorage := nodeetcd.NewStorage(restOptionsGetter(api.Resource("nodes")), c.KubeletClient, c.ProxyTransport)
restStorage.NodeRegistry = node.NewRegistry(nodeStorage.Node)
podStorage := podetcd.NewStorage(
restOptionsGetter(api.Resource("pods")),
kubeletclient.ConnectionInfoGetter(nodeStorage.Node),
c.ProxyTransport,
)
serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptionsGetter(api.Resource("services")))
restStorage.ServiceRegistry = service.NewRegistry(serviceRESTStorage)
var serviceClusterIPRegistry rangeallocation.RangeRegistry
serviceClusterIPRange := c.ServiceClusterIPRange
if serviceClusterIPRange == nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("service clusterIPRange is nil")
}
serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
ServiceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
serviceClusterIPRegistry = etcd
return etcd
})
restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry
var serviceNodePortRegistry rangeallocation.RangeRegistry
ServiceNodePortAllocator := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
serviceNodePortRegistry = etcd
return etcd
})
restStorage.ServiceNodePortAllocator = serviceNodePortRegistry
controllerStorage := controlleretcd.NewStorage(restOptionsGetter(api.Resource("replicationControllers")))
serviceRest := service.NewStorage(restStorage.ServiceRegistry, restStorage.EndpointRegistry, ServiceClusterIPAllocator, ServiceNodePortAllocator, c.ProxyTransport)
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
"podTemplates": podTemplateStorage,
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest.Service,
"services/proxy": serviceRest.Proxy,
"services/status": serviceStatusStorage,
"endpoints": endpointsStorage,
"nodes": nodeStorage.Node,
"nodes/status": nodeStorage.Status,
"nodes/proxy": nodeStorage.Proxy,
"events": eventStorage,
"limitRanges": limitRangeStorage,
"resourceQuotas": resourceQuotaStorage,
"resourceQuotas/status": resourceQuotaStatusStorage,
"namespaces": namespaceStorage,
"namespaces/status": namespaceStatusStorage,
"namespaces/finalize": namespaceFinalizeStorage,
"secrets": secretStorage,
"serviceAccounts": serviceAccountStorage,
"persistentVolumes": persistentVolumeStorage,
"persistentVolumes/status": persistentVolumeStatusStorage,
"persistentVolumeClaims": persistentVolumeClaimStorage,
"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
"configMaps": configMapStorage,
"componentStatuses": componentstatus.NewStorage(c.ComponentStatusServerFunc),
}
if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) {
restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
}
if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}) {
restStorageMap["pods/eviction"] = podStorage.Eviction
}
apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
return restStorage, apiGroupInfo, nil
}

View File

@ -51,7 +51,7 @@ type ThirdPartyController struct {
thirdPartyResourceRegistry *thirdpartyresourceetcd.REST
}
// Synchronize a single resource with RESTful resources on the master
// SyncOneResource synchronizes a single resource with RESTful resources on the master
func (t *ThirdPartyController) SyncOneResource(rsrc *extensions.ThirdPartyResource) error {
// TODO: we also need to test if the existing installed resource matches the resource we are sync-ing.
// Currently, if there is an older, incompatible resource installed, we won't remove it. We should detect