remove non-reuseable bits of MasterServer

pull/6/head
deads2k 2016-10-27 14:24:11 -04:00
parent cda55a7847
commit 7e65d5693b
19 changed files with 247 additions and 311 deletions

View File

@ -24,5 +24,6 @@ import (
type RESTOptionsGetter func(resource unversioned.GroupResource) generic.RESTOptions
type RESTStorageProvider interface {
GroupName() string
NewRESTStorage(apiResourceConfigSource APIResourceConfigSource, restOptionsGetter RESTOptionsGetter) (APIGroupInfo, bool)
}

View File

@ -32,38 +32,28 @@ go_library(
"//pkg/apis/apps/v1alpha1:go_default_library",
"//pkg/apis/authentication/install:go_default_library",
"//pkg/apis/authentication/v1beta1:go_default_library",
"//pkg/apis/authorization:go_default_library",
"//pkg/apis/authorization/install:go_default_library",
"//pkg/apis/authorization/v1beta1:go_default_library",
"//pkg/apis/autoscaling:go_default_library",
"//pkg/apis/autoscaling/install:go_default_library",
"//pkg/apis/autoscaling/v1:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/batch/install:go_default_library",
"//pkg/apis/batch/v1:go_default_library",
"//pkg/apis/certificates:go_default_library",
"//pkg/apis/certificates/install:go_default_library",
"//pkg/apis/certificates/v1alpha1:go_default_library",
"//pkg/apis/componentconfig/install:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/extensions/install:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/imagepolicy/install:go_default_library",
"//pkg/apis/policy:go_default_library",
"//pkg/apis/policy/install:go_default_library",
"//pkg/apis/policy/v1alpha1:go_default_library",
"//pkg/apis/rbac:go_default_library",
"//pkg/apis/rbac/install:go_default_library",
"//pkg/apis/rbac/v1alpha1:go_default_library",
"//pkg/apis/storage:go_default_library",
"//pkg/apis/storage/install:go_default_library",
"//pkg/apis/storage/v1beta1:go_default_library",
"//pkg/apiserver:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/genericapiserver:go_default_library",
"//pkg/healthz:go_default_library",
"//pkg/kubelet/client:go_default_library",
"//pkg/master/ports:go_default_library",
"//pkg/master/thirdparty:go_default_library",
"//pkg/registry/apps/rest:go_default_library",
"//pkg/registry/authentication/rest:go_default_library",
@ -84,12 +74,10 @@ go_library(
"//pkg/registry/rbac/rest:go_default_library",
"//pkg/registry/storage/rest:go_default_library",
"//pkg/routes:go_default_library",
"//pkg/storage/etcd/util:go_default_library",
"//pkg/util/async:go_default_library",
"//pkg/util/intstr:go_default_library",
"//pkg/util/net:go_default_library",
"//pkg/util/runtime:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/wait:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/prometheus/client_golang/prometheus",

View File

@ -18,12 +18,8 @@ package master
import (
"fmt"
"net"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
"time"
"k8s.io/kubernetes/pkg/api"
@ -31,35 +27,23 @@ import (
apiv1 "k8s.io/kubernetes/pkg/api/v1"
appsapi "k8s.io/kubernetes/pkg/apis/apps/v1alpha1"
authenticationv1beta1 "k8s.io/kubernetes/pkg/apis/authentication/v1beta1"
"k8s.io/kubernetes/pkg/apis/authorization"
authorizationapiv1beta1 "k8s.io/kubernetes/pkg/apis/authorization/v1beta1"
"k8s.io/kubernetes/pkg/apis/autoscaling"
autoscalingapiv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
"k8s.io/kubernetes/pkg/apis/batch"
batchapiv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
"k8s.io/kubernetes/pkg/apis/certificates"
certificatesapiv1alpha1 "k8s.io/kubernetes/pkg/apis/certificates/v1alpha1"
"k8s.io/kubernetes/pkg/apis/extensions"
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/apis/policy"
policyapiv1alpha1 "k8s.io/kubernetes/pkg/apis/policy/v1alpha1"
"k8s.io/kubernetes/pkg/apis/rbac"
rbacapi "k8s.io/kubernetes/pkg/apis/rbac/v1alpha1"
"k8s.io/kubernetes/pkg/apis/storage"
storageapiv1beta1 "k8s.io/kubernetes/pkg/apis/storage/v1beta1"
"k8s.io/kubernetes/pkg/apiserver"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/healthz"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/master/thirdparty"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/routes"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
@ -94,8 +78,7 @@ type Config struct {
DeleteCollectionWorkers int
EventTTL time.Duration
KubeletClientConfig kubeletclient.KubeletClientConfig
// genericapiserver.RESTStorageProviders provides RESTStorage building methods keyed by groupName
RESTStorageProviders map[string]genericapiserver.RESTStorageProvider
// Used to start and monitor tunneling
Tunneler genericapiserver.Tunneler
EnableUISupport bool
@ -113,17 +96,6 @@ type EndpointReconcilerConfig struct {
// Master contains state for a Kubernetes cluster master/api server.
type Master struct {
GenericAPIServer *genericapiserver.GenericAPIServer
thirdPartyResourceServer *thirdparty.ThirdPartyResourceServer
// nodeClient is used to back the tunneler
nodeClient coreclient.NodeInterface
}
type RESTOptionsGetter func(resource unversioned.GroupResource) generic.RESTOptions
type RESTStorageProvider interface {
NewRESTStorage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter RESTOptionsGetter) (groupInfo genericapiserver.APIGroupInfo, enabled bool)
}
type completedConfig struct {
@ -178,9 +150,6 @@ func (c completedConfig) New() (*Master, error) {
m := &Master{
GenericAPIServer: s,
nodeClient: coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes(),
thirdPartyResourceServer: thirdparty.NewThirdPartyResourceServer(s),
}
restOptionsFactory := restOptionsFactory{
@ -198,33 +167,30 @@ func (c completedConfig) New() (*Master, error) {
// install legacy rest storage
if c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.StorageFactory,
ProxyTransport: c.ProxyTransport,
KubeletClientConfig: c.KubeletClientConfig,
EventTTL: c.EventTTL,
ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange,
ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange,
ComponentStatusServerFunc: func() map[string]apiserver.Server { return getServersToValidate(c.StorageFactory) },
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
StorageFactory: c.StorageFactory,
ProxyTransport: c.ProxyTransport,
KubeletClientConfig: c.KubeletClientConfig,
EventTTL: c.EventTTL,
ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange,
ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
}
m.InstallLegacyAPI(c.Config, restOptionsFactory.NewFor, legacyRESTStorageProvider)
}
// Add some hardcoded storage for now. Append to the map.
if c.RESTStorageProviders == nil {
c.RESTStorageProviders = map[string]genericapiserver.RESTStorageProvider{}
restStorageProviders := []genericapiserver.RESTStorageProvider{
appsrest.RESTStorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authenticator},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
extensionsrest.RESTStorageProvider{ResourceInterface: thirdparty.NewThirdPartyResourceServer(s, c.StorageFactory)},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{AuthorizerRBACSuperUser: c.GenericConfig.AuthorizerRBACSuperUser},
storagerest.RESTStorageProvider{},
}
c.RESTStorageProviders[appsapi.GroupName] = appsrest.RESTStorageProvider{}
c.RESTStorageProviders[authenticationv1beta1.GroupName] = authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authenticator}
c.RESTStorageProviders[authorization.GroupName] = authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer}
c.RESTStorageProviders[autoscaling.GroupName] = autoscalingrest.RESTStorageProvider{}
c.RESTStorageProviders[batch.GroupName] = batchrest.RESTStorageProvider{}
c.RESTStorageProviders[certificates.GroupName] = certificatesrest.RESTStorageProvider{}
c.RESTStorageProviders[extensions.GroupName] = extensionsrest.RESTStorageProvider{ResourceInterface: m.thirdPartyResourceServer}
c.RESTStorageProviders[policy.GroupName] = policyrest.RESTStorageProvider{}
c.RESTStorageProviders[rbac.GroupName] = &rbacrest.RESTStorageProvider{AuthorizerRBACSuperUser: c.GenericConfig.AuthorizerRBACSuperUser}
c.RESTStorageProviders[storage.GroupName] = storagerest.RESTStorageProvider{}
m.InstallAPIs(c.Config, restOptionsFactory.NewFor)
m.InstallAPIs(c.Config.GenericConfig.APIResourceConfigSource, restOptionsFactory.NewFor, restStorageProviders...)
m.InstallGeneralEndpoints(c.Config)
@ -255,7 +221,9 @@ func (m *Master) InstallGeneralEndpoints(c *Config) {
// Run the tunneler.
healthzChecks := []healthz.HealthzChecker{}
if c.Tunneler != nil {
c.Tunneler.Run(m.getNodeAddresses)
nodeClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes()
c.Tunneler.Run(nodeAddressProvider{nodeClient}.externalAddresses)
healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", genericapiserver.TunnelSyncHealthChecker(c.Tunneler)))
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "apiserver_proxy_tunnel_sync_latency_secs",
@ -272,34 +240,22 @@ func (m *Master) InstallGeneralEndpoints(c *Config) {
}
func (m *Master) InstallAPIs(c *Config, restOptionsGetter genericapiserver.RESTOptionsGetter) {
// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Master) InstallAPIs(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter genericapiserver.RESTOptionsGetter, restStorageProviders ...genericapiserver.RESTStorageProvider) {
apiGroupsInfo := []genericapiserver.APIGroupInfo{}
// Install third party resource support if requested
// TODO seems like this bit ought to be unconditional and the REST API is controlled by the config
if c.GenericConfig.APIResourceConfigSource.ResourceEnabled(extensionsapiv1beta1.SchemeGroupVersion.WithResource("thirdpartyresources")) {
var err error
// TODO figure out why this isn't a loopback client
m.thirdPartyResourceServer.ThirdPartyStorageConfig, err = c.StorageFactory.NewConfig(extensions.Resource("thirdpartyresources"))
if err != nil {
glog.Fatalf("Error getting third party storage: %v", err)
}
}
// stabilize order.
// TODO find a better way to configure priority of groups
for _, group := range sets.StringKeySet(c.RESTStorageProviders).List() {
if !c.GenericConfig.APIResourceConfigSource.AnyResourcesForGroupEnabled(group) {
glog.V(1).Infof("Skipping disabled API group %q.", group)
for _, restStorageBuilder := range restStorageProviders {
groupName := restStorageBuilder.GroupName()
if !apiResourceConfigSource.AnyResourcesForGroupEnabled(groupName) {
glog.V(1).Infof("Skipping disabled API group %q.", groupName)
continue
}
restStorageBuilder := c.RESTStorageProviders[group]
apiGroupInfo, enabled := restStorageBuilder.NewRESTStorage(c.GenericConfig.APIResourceConfigSource, restOptionsGetter)
apiGroupInfo, enabled := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if !enabled {
glog.Warningf("Problem initializing API group %q, skipping.", group)
glog.Warningf("Problem initializing API group %q, skipping.", groupName)
continue
}
glog.V(1).Infof("Enabling API group %q.", group)
glog.V(1).Infof("Enabling API group %q.", groupName)
if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
name, hook, err := postHookProvider.PostStartHook()
@ -321,44 +277,6 @@ func (m *Master) InstallAPIs(c *Config, restOptionsGetter genericapiserver.RESTO
}
}
func getServersToValidate(storageFactory genericapiserver.StorageFactory) map[string]apiserver.Server {
serversToValidate := map[string]apiserver.Server{
"controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"},
"scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
}
for ix, machine := range storageFactory.Backends() {
etcdUrl, err := url.Parse(machine)
if err != nil {
glog.Errorf("Failed to parse etcd url for validation: %v", err)
continue
}
var port int
var addr string
if strings.Contains(etcdUrl.Host, ":") {
var portString string
addr, portString, err = net.SplitHostPort(etcdUrl.Host)
if err != nil {
glog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err)
continue
}
port, _ = strconv.Atoi(portString)
} else {
addr = etcdUrl.Host
port = 2379
}
// TODO: etcd health checking should be abstracted in the storage tier
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{
Addr: addr,
EnableHTTPS: etcdUrl.Scheme == "https",
Port: port,
Path: "/health",
Validate: etcdutil.EtcdHealthCheck,
}
}
return serversToValidate
}
type restOptionsFactory struct {
deleteCollectionWorkers int
enableGarbageCollection bool
@ -381,6 +299,27 @@ func (f restOptionsFactory) NewFor(resource unversioned.GroupResource) generic.R
}
}
type nodeAddressProvider struct {
nodeClient coreclient.NodeInterface
}
func (n nodeAddressProvider) externalAddresses() (addresses []string, err error) {
nodes, err := n.nodeClient.List(api.ListOptions{})
if err != nil {
return nil, err
}
addrs := []string{}
for ix := range nodes.Items {
node := &nodes.Items[ix]
addr, err := findExternalAddress(node)
if err != nil {
return nil, err
}
addrs = append(addrs, addr)
}
return addrs, nil
}
// findExternalAddress returns ExternalIP of provided node with fallback to LegacyHostIP.
func findExternalAddress(node *api.Node) (string, error) {
var fallback string
@ -399,23 +338,6 @@ func findExternalAddress(node *api.Node) (string, error) {
return "", fmt.Errorf("Couldn't find external address: %v", node)
}
func (m *Master) getNodeAddresses() ([]string, error) {
nodes, err := m.nodeClient.List(api.ListOptions{})
if err != nil {
return nil, err
}
addrs := []string{}
for ix := range nodes.Items {
node := &nodes.Items[ix]
addr, err := findExternalAddress(node)
if err != nil {
return nil, err
}
addrs = append(addrs, addr)
}
return addrs, nil
}
func DefaultAPIResourceConfigSource() *genericapiserver.ResourceConfig {
ret := genericapiserver.NewResourceConfig()
ret.EnableVersions(

View File

@ -102,9 +102,6 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
t.Fatal(err)
}
fakeNodeClient := fake.NewSimpleClientset(registrytest.MakeNodeList([]string{"node1", "node2"}, api.NodeResources{}))
master.nodeClient = fakeNodeClient.Core().Nodes()
return master, server, *config, assert.New(t)
}
@ -181,23 +178,6 @@ func TestVersion(t *testing.T) {
}
}
// TestGetServersToValidate verifies the unexported getServersToValidate function
func TestGetServersToValidate(t *testing.T) {
_, etcdserver, config, assert := setUp(t)
defer etcdserver.Terminate(t)
servers := getServersToValidate(config.StorageFactory)
// Expected servers to validate: scheduler, controller-manager and etcd.
assert.Equal(3, len(servers), "unexpected server list: %#v", servers)
for _, server := range []string{"scheduler", "controller-manager", "etcd-0"} {
if _, ok := servers[server]; !ok {
t.Errorf("server list missing: %s", server)
}
}
}
// TestFindExternalAddress verifies both pass and fail cases for the unexported
// findExternalAddress function
func TestFindExternalAddress(t *testing.T) {
@ -230,34 +210,36 @@ func (*fakeEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP,
// TestGetNodeAddresses verifies that proper results are returned
// when requesting node addresses.
func TestGetNodeAddresses(t *testing.T) {
master, etcdserver, _, assert := setUp(t)
defer etcdserver.Terminate(t)
assert := assert.New(t)
fakeNodeClient := fake.NewSimpleClientset(registrytest.MakeNodeList([]string{"node1", "node2"}, api.NodeResources{})).Core().Nodes()
addressProvider := nodeAddressProvider{fakeNodeClient}
// Fail case (no addresses associated with nodes)
nodes, _ := master.nodeClient.List(api.ListOptions{})
addrs, err := master.getNodeAddresses()
nodes, _ := fakeNodeClient.List(api.ListOptions{})
addrs, err := addressProvider.externalAddresses()
assert.Error(err, "getNodeAddresses should have caused an error as there are no addresses.")
assert.Error(err, "addresses should have caused an error as there are no addresses.")
assert.Equal([]string(nil), addrs)
// Pass case with External type IP
nodes, _ = master.nodeClient.List(api.ListOptions{})
nodes, _ = fakeNodeClient.List(api.ListOptions{})
for index := range nodes.Items {
nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeExternalIP, Address: "127.0.0.1"}}
master.nodeClient.Update(&nodes.Items[index])
fakeNodeClient.Update(&nodes.Items[index])
}
addrs, err = master.getNodeAddresses()
assert.NoError(err, "getNodeAddresses should not have returned an error.")
addrs, err = addressProvider.externalAddresses()
assert.NoError(err, "addresses should not have returned an error.")
assert.Equal([]string{"127.0.0.1", "127.0.0.1"}, addrs)
// Pass case with LegacyHost type IP
nodes, _ = master.nodeClient.List(api.ListOptions{})
nodes, _ = fakeNodeClient.List(api.ListOptions{})
for index := range nodes.Items {
nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.2"}}
master.nodeClient.Update(&nodes.Items[index])
fakeNodeClient.Update(&nodes.Items[index])
}
addrs, err = master.getNodeAddresses()
assert.NoError(err, "getNodeAddresses failback should not have returned an error.")
addrs, err = addressProvider.externalAddresses()
assert.NoError(err, "addresses failback should not have returned an error.")
assert.Equal([]string{"127.0.0.2", "127.0.0.2"}, addrs)
}
@ -350,126 +332,6 @@ func TestAPIVersionOfDiscoveryEndpoints(t *testing.T) {
}
func TestDiscoveryAtAPIS(t *testing.T) {
master, etcdserver, _, assert := newLimitedMaster(t)
defer etcdserver.Terminate(t)
server := httptest.NewServer(master.GenericAPIServer.HandlerContainer.ServeMux)
resp, err := http.Get(server.URL + "/apis")
if !assert.NoError(err) {
t.Errorf("unexpected error: %v", err)
}
assert.Equal(http.StatusOK, resp.StatusCode)
groupList := unversioned.APIGroupList{}
assert.NoError(decodeResponse(resp, &groupList))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expectGroupNames := sets.NewString(autoscaling.GroupName, batch.GroupName, apps.GroupName, extensions.GroupName)
expectVersions := map[string][]unversioned.GroupVersionForDiscovery{
autoscaling.GroupName: {
{
GroupVersion: testapi.Autoscaling.GroupVersion().String(),
Version: testapi.Autoscaling.GroupVersion().Version,
},
},
// batch is using its pkg/apis/batch/ types here since during installation
// both versions get installed and testapi.go currently does not support
// multi-versioned clients
batch.GroupName: {
{
GroupVersion: batchapiv1.SchemeGroupVersion.String(),
Version: batchapiv1.SchemeGroupVersion.Version,
},
{
GroupVersion: batchapiv2alpha1.SchemeGroupVersion.String(),
Version: batchapiv2alpha1.SchemeGroupVersion.Version,
},
},
apps.GroupName: {
{
GroupVersion: testapi.Apps.GroupVersion().String(),
Version: testapi.Apps.GroupVersion().Version,
},
},
extensions.GroupName: {
{
GroupVersion: testapi.Extensions.GroupVersion().String(),
Version: testapi.Extensions.GroupVersion().Version,
},
},
}
expectPreferredVersion := map[string]unversioned.GroupVersionForDiscovery{
autoscaling.GroupName: {
GroupVersion: registered.GroupOrDie(autoscaling.GroupName).GroupVersion.String(),
Version: registered.GroupOrDie(autoscaling.GroupName).GroupVersion.Version,
},
batch.GroupName: {
GroupVersion: registered.GroupOrDie(batch.GroupName).GroupVersion.String(),
Version: registered.GroupOrDie(batch.GroupName).GroupVersion.Version,
},
apps.GroupName: {
GroupVersion: registered.GroupOrDie(apps.GroupName).GroupVersion.String(),
Version: registered.GroupOrDie(apps.GroupName).GroupVersion.Version,
},
extensions.GroupName: {
GroupVersion: registered.GroupOrDie(extensions.GroupName).GroupVersion.String(),
Version: registered.GroupOrDie(extensions.GroupName).GroupVersion.Version,
},
}
assert.Equal(4, len(groupList.Groups))
for _, group := range groupList.Groups {
if !expectGroupNames.Has(group.Name) {
t.Errorf("got unexpected group %s", group.Name)
}
assert.Equal(expectVersions[group.Name], group.Versions)
assert.Equal(expectPreferredVersion[group.Name], group.PreferredVersion)
}
thirdPartyGV := unversioned.GroupVersionForDiscovery{GroupVersion: "company.com/v1", Version: "v1"}
master.thirdPartyResourceServer.InstallThirdPartyResource(&extensions.ThirdPartyResource{
ObjectMeta: api.ObjectMeta{Name: "foo.company.com"},
Versions: []extensions.APIVersion{{Name: "v1"}},
})
resp, err = http.Get(server.URL + "/apis")
if !assert.NoError(err) {
t.Errorf("unexpected error: %v", err)
}
assert.Equal(http.StatusOK, resp.StatusCode)
assert.NoError(decodeResponse(resp, &groupList))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
assert.Equal(5, len(groupList.Groups))
expectGroupNames.Insert("company.com")
expectVersions["company.com"] = []unversioned.GroupVersionForDiscovery{thirdPartyGV}
expectPreferredVersion["company.com"] = thirdPartyGV
for _, group := range groupList.Groups {
if !expectGroupNames.Has(group.Name) {
t.Errorf("got unexpected group %s", group.Name)
}
assert.Equal(expectVersions[group.Name], group.Versions)
assert.Equal(expectPreferredVersion[group.Name], group.PreferredVersion)
}
}
func writeResponseToFile(resp *http.Response, filename string) error {
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return ioutil.WriteFile(filename, data, 0755)
}
// TestValidOpenAPISpec verifies that the open api is added
// at the proper endpoint and the spec is valid.
func TestValidOpenAPISpec(t *testing.T) {

View File

@ -58,7 +58,7 @@ type ThirdPartyResourceServer struct {
deleteCollectionWorkers int
// storage for third party objects
ThirdPartyStorageConfig *storagebackend.Config
thirdPartyStorageConfig *storagebackend.Config
// map from api path to a tuple of (storage for the objects, APIGroup)
thirdPartyResources map[string]*thirdPartyEntry
// protects the map
@ -68,11 +68,19 @@ type ThirdPartyResourceServer struct {
disableThirdPartyControllerForTesting bool
}
func NewThirdPartyResourceServer(genericAPIServer *genericapiserver.GenericAPIServer) *ThirdPartyResourceServer {
return &ThirdPartyResourceServer{
func NewThirdPartyResourceServer(genericAPIServer *genericapiserver.GenericAPIServer, storageFactory genericapiserver.StorageFactory) *ThirdPartyResourceServer {
ret := &ThirdPartyResourceServer{
genericAPIServer: genericAPIServer,
thirdPartyResources: map[string]*thirdPartyEntry{},
}
var err error
ret.thirdPartyStorageConfig, err = storageFactory.NewConfig(extensions.Resource("thirdpartyresources"))
if err != nil {
glog.Fatalf("Error building third party storage: %v", err)
}
return ret
}
// thirdPartyEntry combines objects storage and API group into one struct
@ -276,7 +284,7 @@ func (m *ThirdPartyResourceServer) InstallThirdPartyResource(rsrc *extensions.Th
func (m *ThirdPartyResourceServer) thirdpartyapi(group, kind, version, pluralResource string) *apiserver.APIGroupVersion {
resourceStorage := thirdpartyresourcedataetcd.NewREST(
generic.RESTOptions{
StorageConfig: m.ThirdPartyStorageConfig,
StorageConfig: m.thirdPartyStorageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: m.deleteCollectionWorkers,
},

View File

@ -50,3 +50,7 @@ func (p RESTStorageProvider) v1alpha1Storage(apiResourceConfigSource genericapis
}
return storage
}
func (p RESTStorageProvider) GroupName() string {
return apps.GroupName
}

View File

@ -60,3 +60,7 @@ func (p RESTStorageProvider) v1beta1Storage(apiResourceConfigSource genericapise
return storage
}
func (p RESTStorageProvider) GroupName() string {
return authentication.GroupName
}

View File

@ -64,3 +64,7 @@ func (p RESTStorageProvider) v1beta1Storage(apiResourceConfigSource genericapise
return storage
}
func (p RESTStorageProvider) GroupName() string {
return authorization.GroupName
}

View File

@ -50,3 +50,7 @@ func (p RESTStorageProvider) v1Storage(apiResourceConfigSource genericapiserver.
}
return storage
}
func (p RESTStorageProvider) GroupName() string {
return autoscaling.GroupName
}

View File

@ -73,3 +73,7 @@ func (p RESTStorageProvider) v2alpha1Storage(apiResourceConfigSource genericapis
}
return storage
}
func (p RESTStorageProvider) GroupName() string {
return batch.GroupName
}

View File

@ -51,3 +51,7 @@ func (p RESTStorageProvider) v1alpha1Storage(apiResourceConfigSource genericapis
}
return storage
}
func (p RESTStorageProvider) GroupName() string {
return certificates.GroupName
}

View File

@ -24,6 +24,7 @@ go_library(
"//pkg/client/restclient:go_default_library",
"//pkg/genericapiserver:go_default_library",
"//pkg/kubelet/client:go_default_library",
"//pkg/master/ports:go_default_library",
"//pkg/registry/core/componentstatus:go_default_library",
"//pkg/registry/core/configmap/etcd:go_default_library",
"//pkg/registry/core/controller/etcd:go_default_library",
@ -49,6 +50,19 @@ go_library(
"//pkg/registry/core/service/ipallocator:go_default_library",
"//pkg/registry/core/service/portallocator:go_default_library",
"//pkg/registry/core/serviceaccount/etcd:go_default_library",
"//pkg/storage/etcd/util:go_default_library",
"//pkg/util/net:go_default_library",
"//vendor:github.com/golang/glog",
],
)
go_test(
name = "go_default_test",
srcs = ["storage_core_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/unversioned:go_default_library",
"//pkg/storage/storagebackend:go_default_library",
],
)

View File

@ -20,8 +20,13 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/unversioned"
@ -31,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/genericapiserver"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/registry/core/componentstatus"
configmapetcd "k8s.io/kubernetes/pkg/registry/core/configmap/etcd"
controlleretcd "k8s.io/kubernetes/pkg/registry/core/controller/etcd"
@ -56,6 +62,7 @@ import (
ipallocator "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
serviceaccountetcd "k8s.io/kubernetes/pkg/registry/core/serviceaccount/etcd"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
utilnet "k8s.io/kubernetes/pkg/util/net"
)
@ -72,14 +79,9 @@ type LegacyRESTStorageProvider struct {
ServiceClusterIPRange *net.IPNet
ServiceNodePortRange utilnet.PortRange
// ComponentStatusServerFunc is a func used to locate servers to back component status
ComponentStatusServerFunc ComponentStatusServerFunc
LoopbackClientConfig *restclient.Config
}
type ComponentStatusServerFunc func() map[string]apiserver.Server
// LegacyRESTStorage returns stateful information about particular instances of REST storage to
// master.go for wiring controllers.
// TODO remove this by running the controller as a poststarthook
@ -227,7 +229,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
"configMaps": configMapStorage,
"componentStatuses": componentstatus.NewStorage(c.ComponentStatusServerFunc),
"componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
}
if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) {
restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
@ -239,3 +241,49 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
return restStorage, apiGroupInfo, nil
}
func (p LegacyRESTStorageProvider) GroupName() string {
return api.GroupName
}
type componentStatusStorage struct {
storageFactory genericapiserver.StorageFactory
}
func (s componentStatusStorage) serversToValidate() map[string]apiserver.Server {
serversToValidate := map[string]apiserver.Server{
"controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"},
"scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
}
for ix, machine := range s.storageFactory.Backends() {
etcdUrl, err := url.Parse(machine)
if err != nil {
glog.Errorf("Failed to parse etcd url for validation: %v", err)
continue
}
var port int
var addr string
if strings.Contains(etcdUrl.Host, ":") {
var portString string
addr, portString, err = net.SplitHostPort(etcdUrl.Host)
if err != nil {
glog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err)
continue
}
port, _ = strconv.Atoi(portString)
} else {
addr = etcdUrl.Host
port = 2379
}
// TODO: etcd health checking should be abstracted in the storage tier
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{
Addr: addr,
EnableHTTPS: etcdUrl.Scheme == "https",
Port: port,
Path: "/health",
Validate: etcdutil.EtcdHealthCheck,
}
}
return serversToValidate
}

View File

@ -0,0 +1,52 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"testing"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
func TestGetServersToValidate(t *testing.T) {
servers := componentStatusStorage{fakeStorageFactory{}}.serversToValidate()
if e, a := 3, len(servers); e != a {
t.Errorf("expected %v, got %v", e, a)
}
for _, server := range []string{"scheduler", "controller-manager", "etcd-0"} {
if _, ok := servers[server]; !ok {
t.Errorf("server list missing: %s", server)
}
}
}
type fakeStorageFactory struct{}
func (f fakeStorageFactory) NewConfig(groupResource unversioned.GroupResource) (*storagebackend.Config, error) {
return nil, nil
}
func (f fakeStorageFactory) ResourcePrefix(groupResource unversioned.GroupResource) string {
return ""
}
func (f fakeStorageFactory) Backends() []string {
return []string{"etcd-0"}
}

View File

@ -140,3 +140,7 @@ func (p RESTStorageProvider) postStartHookFunc(hookContext genericapiserver.Post
return nil
}
func (p RESTStorageProvider) GroupName() string {
return extensions.GroupName
}

View File

@ -50,3 +50,7 @@ func (p RESTStorageProvider) v1alpha1Storage(apiResourceConfigSource genericapis
}
return storage
}
func (p RESTStorageProvider) GroupName() string {
return policy.GroupName
}

View File

@ -49,10 +49,10 @@ type RESTStorageProvider struct {
AuthorizerRBACSuperUser string
}
var _ genericapiserver.RESTStorageProvider = &RESTStorageProvider{}
var _ genericapiserver.PostStartHookProvider = &RESTStorageProvider{}
var _ genericapiserver.RESTStorageProvider = RESTStorageProvider{}
var _ genericapiserver.PostStartHookProvider = RESTStorageProvider{}
func (p *RESTStorageProvider) NewRESTStorage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter genericapiserver.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool) {
func (p RESTStorageProvider) NewRESTStorage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter genericapiserver.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(rbac.GroupName)
if apiResourceConfigSource.AnyResourcesForVersionEnabled(rbacapiv1alpha1.SchemeGroupVersion) {
@ -63,7 +63,7 @@ func (p *RESTStorageProvider) NewRESTStorage(apiResourceConfigSource genericapis
return apiGroupInfo, true
}
func (p *RESTStorageProvider) v1alpha1Storage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter genericapiserver.RESTOptionsGetter) map[string]rest.Storage {
func (p RESTStorageProvider) v1alpha1Storage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter genericapiserver.RESTOptionsGetter) map[string]rest.Storage {
version := rbacapiv1alpha1.SchemeGroupVersion
once := new(sync.Once)
@ -100,7 +100,7 @@ func (p *RESTStorageProvider) v1alpha1Storage(apiResourceConfigSource genericapi
return storage
}
func (p *RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
return "rbac/bootstrap-roles", PostStartHook, nil
}
@ -153,3 +153,7 @@ func PostStartHook(hookContext genericapiserver.PostStartHookContext) error {
return nil
}
func (p RESTStorageProvider) GroupName() string {
return rbac.GroupName
}

View File

@ -52,3 +52,7 @@ func (p RESTStorageProvider) v1beta1Storage(apiResourceConfigSource genericapise
return storage
}
func (p RESTStorageProvider) GroupName() string {
return storageapi.GroupName
}

View File

@ -686,6 +686,7 @@ k8s.io/kubernetes/pkg/registry/core/podtemplate,thockin,1
k8s.io/kubernetes/pkg/registry/core/podtemplate/etcd,brendandburns,1
k8s.io/kubernetes/pkg/registry/core/resourcequota,vulpecula,1
k8s.io/kubernetes/pkg/registry/core/resourcequota/etcd,ghodss,1
k8s.io/kubernetes/pkg/registry/core/rest,deads2k,0
k8s.io/kubernetes/pkg/registry/core/secret,jdef,1
k8s.io/kubernetes/pkg/registry/core/secret/etcd,freehan,1
k8s.io/kubernetes/pkg/registry/core/service,madhusudancs,1

1 name owner auto-assigned
686 k8s.io/kubernetes/pkg/registry/core/podtemplate/etcd brendandburns 1
687 k8s.io/kubernetes/pkg/registry/core/resourcequota vulpecula 1
688 k8s.io/kubernetes/pkg/registry/core/resourcequota/etcd ghodss 1
689 k8s.io/kubernetes/pkg/registry/core/rest deads2k 0
690 k8s.io/kubernetes/pkg/registry/core/secret jdef 1
691 k8s.io/kubernetes/pkg/registry/core/secret/etcd freehan 1
692 k8s.io/kubernetes/pkg/registry/core/service madhusudancs 1