mirror of https://github.com/k3s-io/k3s
449 lines
17 KiB
Go
449 lines
17 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package master
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"k8s.io/kubernetes/pkg/api"
|
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
|
apiv1 "k8s.io/kubernetes/pkg/api/v1"
|
|
appsapi "k8s.io/kubernetes/pkg/apis/apps/v1alpha1"
|
|
authenticationv1beta1 "k8s.io/kubernetes/pkg/apis/authentication/v1beta1"
|
|
"k8s.io/kubernetes/pkg/apis/authorization"
|
|
authorizationapiv1beta1 "k8s.io/kubernetes/pkg/apis/authorization/v1beta1"
|
|
"k8s.io/kubernetes/pkg/apis/autoscaling"
|
|
autoscalingapiv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
|
|
"k8s.io/kubernetes/pkg/apis/batch"
|
|
batchapiv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
|
|
"k8s.io/kubernetes/pkg/apis/certificates"
|
|
certificatesapiv1alpha1 "k8s.io/kubernetes/pkg/apis/certificates/v1alpha1"
|
|
"k8s.io/kubernetes/pkg/apis/extensions"
|
|
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
|
"k8s.io/kubernetes/pkg/apis/policy"
|
|
policyapiv1alpha1 "k8s.io/kubernetes/pkg/apis/policy/v1alpha1"
|
|
"k8s.io/kubernetes/pkg/apis/rbac"
|
|
rbacapi "k8s.io/kubernetes/pkg/apis/rbac/v1alpha1"
|
|
"k8s.io/kubernetes/pkg/apis/storage"
|
|
storageapiv1beta1 "k8s.io/kubernetes/pkg/apis/storage/v1beta1"
|
|
"k8s.io/kubernetes/pkg/apiserver"
|
|
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
|
|
"k8s.io/kubernetes/pkg/genericapiserver"
|
|
"k8s.io/kubernetes/pkg/healthz"
|
|
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
|
"k8s.io/kubernetes/pkg/master/ports"
|
|
"k8s.io/kubernetes/pkg/master/thirdparty"
|
|
|
|
"k8s.io/kubernetes/pkg/registry/generic"
|
|
"k8s.io/kubernetes/pkg/registry/generic/registry"
|
|
"k8s.io/kubernetes/pkg/routes"
|
|
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
|
"k8s.io/kubernetes/pkg/util/sets"
|
|
|
|
"github.com/golang/glog"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
// RESTStorage installers
|
|
appsrest "k8s.io/kubernetes/pkg/registry/apps/rest"
|
|
authenticationrest "k8s.io/kubernetes/pkg/registry/authentication/rest"
|
|
authorizationrest "k8s.io/kubernetes/pkg/registry/authorization/rest"
|
|
autoscalingrest "k8s.io/kubernetes/pkg/registry/autoscaling/rest"
|
|
batchrest "k8s.io/kubernetes/pkg/registry/batch/rest"
|
|
certificatesrest "k8s.io/kubernetes/pkg/registry/certificates/rest"
|
|
corerest "k8s.io/kubernetes/pkg/registry/core/rest"
|
|
extensionsrest "k8s.io/kubernetes/pkg/registry/extensions/rest"
|
|
policyrest "k8s.io/kubernetes/pkg/registry/policy/rest"
|
|
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
|
|
storagerest "k8s.io/kubernetes/pkg/registry/storage/rest"
|
|
)
|
|
|
|
const (
|
|
// DefaultEndpointReconcilerInterval is the default amount of time for how often the endpoints for
|
|
// the kubernetes Service are reconciled.
|
|
DefaultEndpointReconcilerInterval = 10 * time.Second
|
|
)
|
|
|
|
type Config struct {
|
|
GenericConfig *genericapiserver.Config
|
|
|
|
StorageFactory genericapiserver.StorageFactory
|
|
EnableWatchCache bool
|
|
EnableCoreControllers bool
|
|
EndpointReconcilerConfig EndpointReconcilerConfig
|
|
DeleteCollectionWorkers int
|
|
EventTTL time.Duration
|
|
KubeletClientConfig kubeletclient.KubeletClientConfig
|
|
// genericapiserver.RESTStorageProviders provides RESTStorage building methods keyed by groupName
|
|
RESTStorageProviders map[string]genericapiserver.RESTStorageProvider
|
|
// Used to start and monitor tunneling
|
|
Tunneler genericapiserver.Tunneler
|
|
EnableUISupport bool
|
|
EnableLogsSupport bool
|
|
ProxyTransport http.RoundTripper
|
|
}
|
|
|
|
// EndpointReconcilerConfig holds the endpoint reconciler and endpoint reconciliation interval to be
|
|
// used by the master.
|
|
type EndpointReconcilerConfig struct {
|
|
Reconciler EndpointReconciler
|
|
Interval time.Duration
|
|
}
|
|
|
|
// Master contains state for a Kubernetes cluster master/api server.
|
|
type Master struct {
|
|
GenericAPIServer *genericapiserver.GenericAPIServer
|
|
|
|
thirdPartyResourceServer *thirdparty.ThirdPartyResourceServer
|
|
|
|
// nodeClient is used to back the tunneler
|
|
nodeClient coreclient.NodeInterface
|
|
}
|
|
|
|
type RESTOptionsGetter func(resource unversioned.GroupResource) generic.RESTOptions
|
|
|
|
type RESTStorageProvider interface {
|
|
NewRESTStorage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter RESTOptionsGetter) (groupInfo genericapiserver.APIGroupInfo, enabled bool)
|
|
}
|
|
|
|
type completedConfig struct {
|
|
*Config
|
|
}
|
|
|
|
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
|
|
func (c *Config) Complete() completedConfig {
|
|
c.GenericConfig.Complete()
|
|
|
|
// enable swagger UI only if general UI support is on
|
|
c.GenericConfig.EnableSwaggerUI = c.GenericConfig.EnableSwaggerUI && c.EnableUISupport
|
|
|
|
if c.EndpointReconcilerConfig.Interval == 0 {
|
|
c.EndpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
|
|
}
|
|
|
|
if c.EndpointReconcilerConfig.Reconciler == nil {
|
|
// use a default endpoint reconciler if nothing is set
|
|
endpointClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
|
c.EndpointReconcilerConfig.Reconciler = NewMasterCountEndpointReconciler(c.GenericConfig.MasterCount, endpointClient)
|
|
}
|
|
|
|
return completedConfig{c}
|
|
}
|
|
|
|
// SkipComplete provides a way to construct a server instance without config completion.
|
|
func (c *Config) SkipComplete() completedConfig {
|
|
return completedConfig{c}
|
|
}
|
|
|
|
// New returns a new instance of Master from the given config.
|
|
// Certain config fields will be set to a default value if unset.
|
|
// Certain config fields must be specified, including:
|
|
// KubeletClientConfig
|
|
func (c completedConfig) New() (*Master, error) {
|
|
if reflect.DeepEqual(c.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
|
|
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
|
|
}
|
|
|
|
s, err := c.Config.GenericConfig.SkipComplete().New() // completion is done in Complete, no need for a second time
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if c.EnableUISupport {
|
|
routes.UIRedirect{}.Install(s.HandlerContainer)
|
|
}
|
|
if c.EnableLogsSupport {
|
|
routes.Logs{}.Install(s.HandlerContainer)
|
|
}
|
|
|
|
m := &Master{
|
|
GenericAPIServer: s,
|
|
nodeClient: coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes(),
|
|
|
|
thirdPartyResourceServer: thirdparty.NewThirdPartyResourceServer(s),
|
|
}
|
|
|
|
restOptionsFactory := restOptionsFactory{
|
|
deleteCollectionWorkers: c.DeleteCollectionWorkers,
|
|
enableGarbageCollection: c.GenericConfig.EnableGarbageCollection,
|
|
storageFactory: c.StorageFactory,
|
|
}
|
|
|
|
if c.EnableWatchCache {
|
|
restOptionsFactory.storageDecorator = registry.StorageWithCacher
|
|
} else {
|
|
restOptionsFactory.storageDecorator = generic.UndecoratedStorage
|
|
}
|
|
|
|
// install legacy rest storage
|
|
if c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
|
|
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
|
|
StorageFactory: c.StorageFactory,
|
|
ProxyTransport: c.ProxyTransport,
|
|
KubeletClientConfig: c.KubeletClientConfig,
|
|
EventTTL: c.EventTTL,
|
|
ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange,
|
|
ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange,
|
|
ComponentStatusServerFunc: func() map[string]apiserver.Server { return getServersToValidate(c.StorageFactory) },
|
|
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
|
|
}
|
|
m.InstallLegacyAPI(c.Config, restOptionsFactory.NewFor, legacyRESTStorageProvider)
|
|
}
|
|
|
|
// Add some hardcoded storage for now. Append to the map.
|
|
if c.RESTStorageProviders == nil {
|
|
c.RESTStorageProviders = map[string]genericapiserver.RESTStorageProvider{}
|
|
}
|
|
c.RESTStorageProviders[appsapi.GroupName] = appsrest.RESTStorageProvider{}
|
|
c.RESTStorageProviders[authenticationv1beta1.GroupName] = authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authenticator}
|
|
c.RESTStorageProviders[authorization.GroupName] = authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer}
|
|
c.RESTStorageProviders[autoscaling.GroupName] = autoscalingrest.RESTStorageProvider{}
|
|
c.RESTStorageProviders[batch.GroupName] = batchrest.RESTStorageProvider{}
|
|
c.RESTStorageProviders[certificates.GroupName] = certificatesrest.RESTStorageProvider{}
|
|
c.RESTStorageProviders[extensions.GroupName] = extensionsrest.RESTStorageProvider{ResourceInterface: m.thirdPartyResourceServer}
|
|
c.RESTStorageProviders[policy.GroupName] = policyrest.RESTStorageProvider{}
|
|
c.RESTStorageProviders[rbac.GroupName] = &rbacrest.RESTStorageProvider{AuthorizerRBACSuperUser: c.GenericConfig.AuthorizerRBACSuperUser}
|
|
c.RESTStorageProviders[storage.GroupName] = storagerest.RESTStorageProvider{}
|
|
m.InstallAPIs(c.Config, restOptionsFactory.NewFor)
|
|
|
|
m.InstallGeneralEndpoints(c.Config)
|
|
|
|
return m, nil
|
|
}
|
|
|
|
func (m *Master) InstallLegacyAPI(c *Config, restOptionsGetter genericapiserver.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) {
|
|
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
|
|
if err != nil {
|
|
glog.Fatalf("Error building core storage: %v", err)
|
|
}
|
|
|
|
if c.EnableCoreControllers {
|
|
bootstrapController := c.NewBootstrapController(legacyRESTStorage)
|
|
if err := m.GenericAPIServer.AddPostStartHook("bootstrap-controller", bootstrapController.PostStartHook); err != nil {
|
|
glog.Fatalf("Error registering PostStartHook %q: %v", "bootstrap-controller", err)
|
|
}
|
|
}
|
|
|
|
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
|
|
glog.Fatalf("Error in registering group versions: %v", err)
|
|
}
|
|
}
|
|
|
|
// TODO this needs to be refactored so we have a way to add general health checks to genericapiserver
|
|
// TODO profiling should be generic
|
|
func (m *Master) InstallGeneralEndpoints(c *Config) {
|
|
// Run the tunneler.
|
|
healthzChecks := []healthz.HealthzChecker{}
|
|
if c.Tunneler != nil {
|
|
c.Tunneler.Run(m.getNodeAddresses)
|
|
healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", genericapiserver.TunnelSyncHealthChecker(c.Tunneler)))
|
|
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
Name: "apiserver_proxy_tunnel_sync_latency_secs",
|
|
Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.",
|
|
}, func() float64 { return float64(c.Tunneler.SecondsSinceSync()) })
|
|
}
|
|
healthz.InstallHandler(&m.GenericAPIServer.HandlerContainer.NonSwaggerRoutes, healthzChecks...)
|
|
|
|
if c.GenericConfig.EnableProfiling {
|
|
routes.MetricsWithReset{}.Install(m.GenericAPIServer.HandlerContainer)
|
|
} else {
|
|
routes.DefaultMetrics{}.Install(m.GenericAPIServer.HandlerContainer)
|
|
}
|
|
|
|
}
|
|
|
|
func (m *Master) InstallAPIs(c *Config, restOptionsGetter genericapiserver.RESTOptionsGetter) {
|
|
apiGroupsInfo := []genericapiserver.APIGroupInfo{}
|
|
|
|
// Install third party resource support if requested
|
|
// TODO seems like this bit ought to be unconditional and the REST API is controlled by the config
|
|
if c.GenericConfig.APIResourceConfigSource.ResourceEnabled(extensionsapiv1beta1.SchemeGroupVersion.WithResource("thirdpartyresources")) {
|
|
var err error
|
|
// TODO figure out why this isn't a loopback client
|
|
m.thirdPartyResourceServer.ThirdPartyStorageConfig, err = c.StorageFactory.NewConfig(extensions.Resource("thirdpartyresources"))
|
|
if err != nil {
|
|
glog.Fatalf("Error getting third party storage: %v", err)
|
|
}
|
|
}
|
|
|
|
// stabilize order.
|
|
// TODO find a better way to configure priority of groups
|
|
for _, group := range sets.StringKeySet(c.RESTStorageProviders).List() {
|
|
if !c.GenericConfig.APIResourceConfigSource.AnyResourcesForGroupEnabled(group) {
|
|
glog.V(1).Infof("Skipping disabled API group %q.", group)
|
|
continue
|
|
}
|
|
restStorageBuilder := c.RESTStorageProviders[group]
|
|
apiGroupInfo, enabled := restStorageBuilder.NewRESTStorage(c.GenericConfig.APIResourceConfigSource, restOptionsGetter)
|
|
if !enabled {
|
|
glog.Warningf("Problem initializing API group %q, skipping.", group)
|
|
continue
|
|
}
|
|
glog.V(1).Infof("Enabling API group %q.", group)
|
|
|
|
if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
|
|
name, hook, err := postHookProvider.PostStartHook()
|
|
if err != nil {
|
|
glog.Fatalf("Error building PostStartHook: %v", err)
|
|
}
|
|
if err := m.GenericAPIServer.AddPostStartHook(name, hook); err != nil {
|
|
glog.Fatalf("Error registering PostStartHook %q: %v", name, err)
|
|
}
|
|
}
|
|
|
|
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
|
|
}
|
|
|
|
for i := range apiGroupsInfo {
|
|
if err := m.GenericAPIServer.InstallAPIGroup(&apiGroupsInfo[i]); err != nil {
|
|
glog.Fatalf("Error in registering group versions: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func getServersToValidate(storageFactory genericapiserver.StorageFactory) map[string]apiserver.Server {
|
|
serversToValidate := map[string]apiserver.Server{
|
|
"controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"},
|
|
"scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
|
|
}
|
|
|
|
for ix, machine := range storageFactory.Backends() {
|
|
etcdUrl, err := url.Parse(machine)
|
|
if err != nil {
|
|
glog.Errorf("Failed to parse etcd url for validation: %v", err)
|
|
continue
|
|
}
|
|
var port int
|
|
var addr string
|
|
if strings.Contains(etcdUrl.Host, ":") {
|
|
var portString string
|
|
addr, portString, err = net.SplitHostPort(etcdUrl.Host)
|
|
if err != nil {
|
|
glog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err)
|
|
continue
|
|
}
|
|
port, _ = strconv.Atoi(portString)
|
|
} else {
|
|
addr = etcdUrl.Host
|
|
port = 2379
|
|
}
|
|
// TODO: etcd health checking should be abstracted in the storage tier
|
|
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{
|
|
Addr: addr,
|
|
EnableHTTPS: etcdUrl.Scheme == "https",
|
|
Port: port,
|
|
Path: "/health",
|
|
Validate: etcdutil.EtcdHealthCheck,
|
|
}
|
|
}
|
|
return serversToValidate
|
|
}
|
|
|
|
type restOptionsFactory struct {
|
|
deleteCollectionWorkers int
|
|
enableGarbageCollection bool
|
|
storageFactory genericapiserver.StorageFactory
|
|
storageDecorator generic.StorageDecorator
|
|
}
|
|
|
|
func (f restOptionsFactory) NewFor(resource unversioned.GroupResource) generic.RESTOptions {
|
|
storageConfig, err := f.storageFactory.NewConfig(resource)
|
|
if err != nil {
|
|
glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error())
|
|
}
|
|
|
|
return generic.RESTOptions{
|
|
StorageConfig: storageConfig,
|
|
Decorator: f.storageDecorator,
|
|
DeleteCollectionWorkers: f.deleteCollectionWorkers,
|
|
EnableGarbageCollection: f.enableGarbageCollection,
|
|
ResourcePrefix: f.storageFactory.ResourcePrefix(resource),
|
|
}
|
|
}
|
|
|
|
// findExternalAddress returns ExternalIP of provided node with fallback to LegacyHostIP.
|
|
func findExternalAddress(node *api.Node) (string, error) {
|
|
var fallback string
|
|
for ix := range node.Status.Addresses {
|
|
addr := &node.Status.Addresses[ix]
|
|
if addr.Type == api.NodeExternalIP {
|
|
return addr.Address, nil
|
|
}
|
|
if fallback == "" && addr.Type == api.NodeLegacyHostIP {
|
|
fallback = addr.Address
|
|
}
|
|
}
|
|
if fallback != "" {
|
|
return fallback, nil
|
|
}
|
|
return "", fmt.Errorf("Couldn't find external address: %v", node)
|
|
}
|
|
|
|
func (m *Master) getNodeAddresses() ([]string, error) {
|
|
nodes, err := m.nodeClient.List(api.ListOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
addrs := []string{}
|
|
for ix := range nodes.Items {
|
|
node := &nodes.Items[ix]
|
|
addr, err := findExternalAddress(node)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
addrs = append(addrs, addr)
|
|
}
|
|
return addrs, nil
|
|
}
|
|
|
|
func DefaultAPIResourceConfigSource() *genericapiserver.ResourceConfig {
|
|
ret := genericapiserver.NewResourceConfig()
|
|
ret.EnableVersions(
|
|
apiv1.SchemeGroupVersion,
|
|
extensionsapiv1beta1.SchemeGroupVersion,
|
|
batchapiv1.SchemeGroupVersion,
|
|
authenticationv1beta1.SchemeGroupVersion,
|
|
autoscalingapiv1.SchemeGroupVersion,
|
|
appsapi.SchemeGroupVersion,
|
|
policyapiv1alpha1.SchemeGroupVersion,
|
|
rbacapi.SchemeGroupVersion,
|
|
storageapiv1beta1.SchemeGroupVersion,
|
|
certificatesapiv1alpha1.SchemeGroupVersion,
|
|
authorizationapiv1beta1.SchemeGroupVersion,
|
|
)
|
|
|
|
// all extensions resources except these are disabled by default
|
|
ret.EnableResources(
|
|
extensionsapiv1beta1.SchemeGroupVersion.WithResource("daemonsets"),
|
|
extensionsapiv1beta1.SchemeGroupVersion.WithResource("deployments"),
|
|
extensionsapiv1beta1.SchemeGroupVersion.WithResource("horizontalpodautoscalers"),
|
|
extensionsapiv1beta1.SchemeGroupVersion.WithResource("ingresses"),
|
|
extensionsapiv1beta1.SchemeGroupVersion.WithResource("jobs"),
|
|
extensionsapiv1beta1.SchemeGroupVersion.WithResource("networkpolicies"),
|
|
extensionsapiv1beta1.SchemeGroupVersion.WithResource("replicasets"),
|
|
extensionsapiv1beta1.SchemeGroupVersion.WithResource("thirdpartyresources"),
|
|
)
|
|
|
|
return ret
|
|
}
|