mirror of https://github.com/k3s-io/k3s
Remove federated apiserver's dependency on kubernetes api objects
parent
6e99624dd6
commit
831c8d7594
|
@ -36,21 +36,12 @@ import (
|
|||
"k8s.io/kubernetes/pkg/admission"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
apiv1 "k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/apis/autoscaling"
|
||||
autoscalingapiv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
batchapiv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/apiserver"
|
||||
"k8s.io/kubernetes/pkg/apiserver/authenticator"
|
||||
"k8s.io/kubernetes/pkg/capabilities"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
|
||||
"k8s.io/kubernetes/pkg/genericapiserver"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
|
@ -129,51 +120,6 @@ func newEtcd(ns runtime.NegotiatedSerializer, storageGroupVersionString, memoryG
|
|||
return storageConfig.NewStorage()
|
||||
}
|
||||
|
||||
// parse the value of --etcd-servers-overrides and update given storageDestinations.
|
||||
func updateEtcdOverrides(overrides []string, storageVersions map[string]string, etcdConfig etcdstorage.EtcdConfig, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) {
|
||||
if len(overrides) == 0 {
|
||||
return
|
||||
}
|
||||
for _, override := range overrides {
|
||||
tokens := strings.Split(override, "#")
|
||||
if len(tokens) != 2 {
|
||||
glog.Errorf("invalid value of etcd server overrides: %s", override)
|
||||
continue
|
||||
}
|
||||
|
||||
apiresource := strings.Split(tokens[0], "/")
|
||||
if len(apiresource) != 2 {
|
||||
glog.Errorf("invalid resource definition: %s", tokens[0])
|
||||
}
|
||||
group := apiresource[0]
|
||||
resource := apiresource[1]
|
||||
|
||||
apigroup, err := registered.Group(group)
|
||||
if err != nil {
|
||||
glog.Errorf("invalid api group %s: %v", group, err)
|
||||
continue
|
||||
}
|
||||
if _, found := storageVersions[apigroup.GroupVersion.Group]; !found {
|
||||
glog.Errorf("Couldn't find the storage version for group %s", apigroup.GroupVersion.Group)
|
||||
continue
|
||||
}
|
||||
|
||||
servers := strings.Split(tokens[1], ";")
|
||||
overrideEtcdConfig := etcdConfig
|
||||
overrideEtcdConfig.ServerList = servers
|
||||
// Note, internalGV will be wrong for things like batch or
|
||||
// autoscalers, but they shouldn't be using the override
|
||||
// storage.
|
||||
internalGV := apigroup.GroupVersion.Group + "/__internal"
|
||||
etcdOverrideStorage, err := newEtcdFn(api.Codecs, storageVersions[apigroup.GroupVersion.Group], internalGV, overrideEtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
|
||||
}
|
||||
|
||||
storageDestinations.AddStorageOverride(group, resource, etcdOverrideStorage)
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs the specified APIServer. This should never exit.
|
||||
func Run(s *options.APIServer) error {
|
||||
verifyClusterIPFlags(s)
|
||||
|
@ -279,103 +225,7 @@ func Run(s *options.APIServer) error {
|
|||
glog.Errorf("Failed to create clientset: %v", err)
|
||||
}
|
||||
|
||||
legacyV1Group, err := registered.Group(api.GroupName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
storageDestinations := genericapiserver.NewStorageDestinations()
|
||||
|
||||
storageVersions := s.StorageGroupsToGroupVersions()
|
||||
if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found {
|
||||
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions)
|
||||
}
|
||||
etcdStorage, err := newEtcd(api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], "/__internal", s.EtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
storageDestinations.AddAPIGroup("", etcdStorage)
|
||||
|
||||
if apiResourceConfigSource.AnyResourcesForVersionEnabled(extensionsapiv1beta1.SchemeGroupVersion) {
|
||||
glog.Infof("Configuring extensions/v1beta1 storage destination")
|
||||
expGroup, err := registered.Group(extensions.GroupName)
|
||||
if err != nil {
|
||||
glog.Fatalf("Extensions API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err)
|
||||
}
|
||||
if _, found := storageVersions[expGroup.GroupVersion.Group]; !found {
|
||||
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions)
|
||||
}
|
||||
expEtcdStorage, err := newEtcd(api.Codecs, storageVersions[expGroup.GroupVersion.Group], "extensions/__internal", s.EtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
storageDestinations.AddAPIGroup(extensions.GroupName, expEtcdStorage)
|
||||
|
||||
// Since HPA has been moved to the autoscaling group, we need to make
|
||||
// sure autoscaling has a storage destination. If the autoscaling group
|
||||
// itself is on, it will overwrite this decision below.
|
||||
storageDestinations.AddAPIGroup(autoscaling.GroupName, expEtcdStorage)
|
||||
|
||||
// Since Job has been moved to the batch group, we need to make
|
||||
// sure batch has a storage destination. If the batch group
|
||||
// itself is on, it will overwrite this decision below.
|
||||
storageDestinations.AddAPIGroup(batch.GroupName, expEtcdStorage)
|
||||
}
|
||||
|
||||
// autoscaling/v1/horizontalpodautoscalers is a move from extensions/v1beta1/horizontalpodautoscalers.
|
||||
// The storage version needs to be either extensions/v1beta1 or autoscaling/v1.
|
||||
// Users must roll forward while using 1.2, because we will require the latter for 1.3.
|
||||
if apiResourceConfigSource.AnyResourcesForVersionEnabled(autoscalingapiv1.SchemeGroupVersion) {
|
||||
glog.Infof("Configuring autoscaling/v1 storage destination")
|
||||
autoscalingGroup, err := registered.Group(autoscaling.GroupName)
|
||||
if err != nil {
|
||||
glog.Fatalf("Autoscaling API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err)
|
||||
}
|
||||
// Figure out what storage group/version we should use.
|
||||
storageGroupVersion, found := storageVersions[autoscalingGroup.GroupVersion.Group]
|
||||
if !found {
|
||||
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", autoscalingGroup.GroupVersion.Group, storageVersions)
|
||||
}
|
||||
|
||||
if storageGroupVersion != "autoscaling/v1" && storageGroupVersion != "extensions/v1beta1" {
|
||||
glog.Fatalf("The storage version for autoscaling must be either 'autoscaling/v1' or 'extensions/v1beta1'")
|
||||
}
|
||||
glog.Infof("Using %v for autoscaling group storage version", storageGroupVersion)
|
||||
autoscalingEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
storageDestinations.AddAPIGroup(autoscaling.GroupName, autoscalingEtcdStorage)
|
||||
}
|
||||
|
||||
// batch/v1/job is a move from extensions/v1beta1/job. The storage
|
||||
// version needs to be either extensions/v1beta1 or batch/v1. Users
|
||||
// must roll forward while using 1.2, because we will require the
|
||||
// latter for 1.3.
|
||||
if apiResourceConfigSource.AnyResourcesForVersionEnabled(batchapiv1.SchemeGroupVersion) {
|
||||
glog.Infof("Configuring batch/v1 storage destination")
|
||||
batchGroup, err := registered.Group(batch.GroupName)
|
||||
if err != nil {
|
||||
glog.Fatalf("Batch API is enabled in runtime config, but not enabled in the environment variable KUBE_API_VERSIONS. Error: %v", err)
|
||||
}
|
||||
// Figure out what storage group/version we should use.
|
||||
storageGroupVersion, found := storageVersions[batchGroup.GroupVersion.Group]
|
||||
if !found {
|
||||
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", batchGroup.GroupVersion.Group, storageVersions)
|
||||
}
|
||||
|
||||
if storageGroupVersion != "batch/v1" && storageGroupVersion != "extensions/v1beta1" {
|
||||
glog.Fatalf("The storage version for batch must be either 'batch/v1' or 'extensions/v1beta1'")
|
||||
}
|
||||
glog.Infof("Using %v for batch group storage version", storageGroupVersion)
|
||||
batchEtcdStorage, err := newEtcd(api.Codecs, storageGroupVersion, "extensions/__internal", s.EtcdConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
storageDestinations.AddAPIGroup(batch.GroupName, batchEtcdStorage)
|
||||
}
|
||||
|
||||
updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdConfig, &storageDestinations, newEtcd)
|
||||
// TODO: register cluster federation resources here.
|
||||
|
||||
n := s.ServiceClusterIPRange
|
||||
|
||||
|
@ -390,9 +240,8 @@ func Run(s *options.APIServer) error {
|
|||
|
||||
var serviceAccountGetter serviceaccount.ServiceAccountTokenGetter
|
||||
if s.ServiceAccountLookup {
|
||||
// If we need to look up service accounts and tokens,
|
||||
// go directly to etcd to avoid recursive auth insanity
|
||||
serviceAccountGetter = serviceaccountcontroller.NewGetterFromStorageInterface(etcdStorage)
|
||||
// TODO: Maybe do not expose this option in genericapiserver, if most servers do not need it?
|
||||
glog.Fatalf("Invalid ServerRunOptions: ServiceAccountLookup should be false for ubernetes server")
|
||||
}
|
||||
|
||||
authenticator, err := authenticator.New(authenticator.AuthenticatorConfig{
|
||||
|
@ -409,7 +258,6 @@ func Run(s *options.APIServer) error {
|
|||
ServiceAccountTokenGetter: serviceAccountGetter,
|
||||
KeystoneURL: s.KeystoneURL,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid Authentication Config: %v", err)
|
||||
}
|
||||
|
@ -447,6 +295,9 @@ func Run(s *options.APIServer) error {
|
|||
}
|
||||
}
|
||||
|
||||
storageDestinations := genericapiserver.NewStorageDestinations()
|
||||
storageVersions := s.StorageGroupsToGroupVersions()
|
||||
|
||||
config := &master.Config{
|
||||
Config: &genericapiserver.Config{
|
||||
StorageDestinations: storageDestinations,
|
||||
|
@ -517,66 +368,7 @@ func getRuntimeConfigValue(s *options.APIServer, apiKey string, defaultValue boo
|
|||
|
||||
// Parses the given runtime-config and formats it into genericapiserver.APIResourceConfigSource
|
||||
func parseRuntimeConfig(s *options.APIServer) (genericapiserver.APIResourceConfigSource, error) {
|
||||
v1GroupVersionString := "api/v1"
|
||||
extensionsGroupVersionString := extensionsapiv1beta1.SchemeGroupVersion.String()
|
||||
versionToResourceSpecifier := map[unversioned.GroupVersion]string{
|
||||
apiv1.SchemeGroupVersion: v1GroupVersionString,
|
||||
extensionsapiv1beta1.SchemeGroupVersion: extensionsGroupVersionString,
|
||||
batchapiv1.SchemeGroupVersion: batchapiv1.SchemeGroupVersion.String(),
|
||||
autoscalingapiv1.SchemeGroupVersion: autoscalingapiv1.SchemeGroupVersion.String(),
|
||||
}
|
||||
|
||||
// TODO: parse the relevant group version when we add any.
|
||||
resourceConfig := master.DefaultAPIResourceConfigSource()
|
||||
|
||||
// "api/all=false" allows users to selectively enable specific api versions.
|
||||
enableAPIByDefault := true
|
||||
allAPIFlagValue, ok := s.RuntimeConfig["api/all"]
|
||||
if ok && allAPIFlagValue == "false" {
|
||||
enableAPIByDefault = false
|
||||
}
|
||||
|
||||
// "api/legacy=false" allows users to disable legacy api versions.
|
||||
disableLegacyAPIs := false
|
||||
legacyAPIFlagValue, ok := s.RuntimeConfig["api/legacy"]
|
||||
if ok && legacyAPIFlagValue == "false" {
|
||||
disableLegacyAPIs = true
|
||||
}
|
||||
_ = disableLegacyAPIs // hush the compiler while we don't have legacy APIs to disable.
|
||||
|
||||
// "<resourceSpecifier>={true|false} allows users to enable/disable API.
|
||||
// This takes preference over api/all and api/legacy, if specified.
|
||||
for version, resourceSpecifier := range versionToResourceSpecifier {
|
||||
enableVersion := getRuntimeConfigValue(s, resourceSpecifier, enableAPIByDefault)
|
||||
if enableVersion {
|
||||
resourceConfig.EnableVersions(version)
|
||||
} else {
|
||||
resourceConfig.DisableVersions(version)
|
||||
}
|
||||
}
|
||||
|
||||
for key := range s.RuntimeConfig {
|
||||
tokens := strings.Split(key, "/")
|
||||
if len(tokens) != 3 {
|
||||
continue
|
||||
}
|
||||
|
||||
switch {
|
||||
case strings.HasPrefix(key, extensionsGroupVersionString+"/"):
|
||||
if !resourceConfig.AnyResourcesForVersionEnabled(extensionsapiv1beta1.SchemeGroupVersion) {
|
||||
return nil, fmt.Errorf("%v is disabled, you cannot configure its resources individually", extensionsapiv1beta1.SchemeGroupVersion)
|
||||
}
|
||||
|
||||
resource := strings.TrimPrefix(key, extensionsGroupVersionString+"/")
|
||||
if getRuntimeConfigValue(s, key, false) {
|
||||
resourceConfig.EnableResources(extensionsapiv1beta1.SchemeGroupVersion.WithResource(resource))
|
||||
} else {
|
||||
resourceConfig.DisableResources(extensionsapiv1beta1.SchemeGroupVersion.WithResource(resource))
|
||||
}
|
||||
|
||||
default:
|
||||
// TODO enable individual resource capability for all GroupVersionResources
|
||||
return nil, fmt.Errorf("%v resources cannot be enabled/disabled individually", key)
|
||||
}
|
||||
}
|
||||
return resourceConfig, nil
|
||||
}
|
||||
|
|
|
@ -17,20 +17,10 @@ limitations under the License.
|
|||
package app
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/federation/cmd/federated-apiserver/app/options"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/genericapiserver"
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
)
|
||||
|
||||
func TestLongRunningRequestRegexp(t *testing.T) {
|
||||
|
@ -73,155 +63,3 @@ func TestLongRunningRequestRegexp(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateEtcdOverrides(t *testing.T) {
|
||||
storageVersions := map[string]string{
|
||||
"": "v1",
|
||||
"extensions": "extensions/v1beta1",
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
apigroup string
|
||||
resource string
|
||||
servers []string
|
||||
}{
|
||||
{
|
||||
apigroup: api.GroupName,
|
||||
resource: "resource",
|
||||
servers: []string{"http://127.0.0.1:10000"},
|
||||
},
|
||||
{
|
||||
apigroup: api.GroupName,
|
||||
resource: "resource",
|
||||
servers: []string{"http://127.0.0.1:10000", "http://127.0.0.1:20000"},
|
||||
},
|
||||
{
|
||||
apigroup: extensions.GroupName,
|
||||
resource: "resource",
|
||||
servers: []string{"http://127.0.0.1:10000"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
newEtcd := func(_ runtime.NegotiatedSerializer, _, _ string, etcdConfig etcdstorage.EtcdConfig) (storage.Interface, error) {
|
||||
if !reflect.DeepEqual(test.servers, etcdConfig.ServerList) {
|
||||
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, etcdConfig.ServerList)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
storageDestinations := genericapiserver.NewStorageDestinations()
|
||||
override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";")
|
||||
defaultEtcdConfig := etcdstorage.EtcdConfig{
|
||||
Prefix: genericapiserver.DefaultEtcdPathPrefix,
|
||||
ServerList: []string{"http://127.0.0.1"},
|
||||
}
|
||||
updateEtcdOverrides([]string{override}, storageVersions, defaultEtcdConfig, &storageDestinations, newEtcd)
|
||||
apigroup, ok := storageDestinations.APIGroups[test.apigroup]
|
||||
if !ok {
|
||||
t.Errorf("apigroup: %s not created", test.apigroup)
|
||||
continue
|
||||
}
|
||||
if apigroup.Overrides == nil {
|
||||
t.Errorf("Overrides not created for: %s", test.apigroup)
|
||||
continue
|
||||
}
|
||||
if _, ok := apigroup.Overrides[test.resource]; !ok {
|
||||
t.Errorf("override not created for: %s", test.resource)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseRuntimeConfig(t *testing.T) {
|
||||
testCases := []struct {
|
||||
runtimeConfig map[string]string
|
||||
expectedAPIConfig func() *genericapiserver.ResourceConfig
|
||||
err bool
|
||||
}{
|
||||
{
|
||||
runtimeConfig: map[string]string{},
|
||||
expectedAPIConfig: func() *genericapiserver.ResourceConfig {
|
||||
return master.DefaultAPIResourceConfigSource()
|
||||
},
|
||||
err: false,
|
||||
},
|
||||
{
|
||||
// Cannot override v1 resources.
|
||||
runtimeConfig: map[string]string{
|
||||
"api/v1/pods": "false",
|
||||
},
|
||||
expectedAPIConfig: func() *genericapiserver.ResourceConfig {
|
||||
return master.DefaultAPIResourceConfigSource()
|
||||
},
|
||||
err: true,
|
||||
},
|
||||
{
|
||||
// Disable v1.
|
||||
runtimeConfig: map[string]string{
|
||||
"api/v1": "false",
|
||||
},
|
||||
expectedAPIConfig: func() *genericapiserver.ResourceConfig {
|
||||
config := master.DefaultAPIResourceConfigSource()
|
||||
config.DisableVersions(unversioned.GroupVersion{Group: "", Version: "v1"})
|
||||
return config
|
||||
},
|
||||
err: false,
|
||||
},
|
||||
{
|
||||
// Disable extensions.
|
||||
runtimeConfig: map[string]string{
|
||||
"extensions/v1beta1": "false",
|
||||
},
|
||||
expectedAPIConfig: func() *genericapiserver.ResourceConfig {
|
||||
config := master.DefaultAPIResourceConfigSource()
|
||||
config.DisableVersions(unversioned.GroupVersion{Group: "extensions", Version: "v1beta1"})
|
||||
return config
|
||||
},
|
||||
err: false,
|
||||
},
|
||||
{
|
||||
// Disable deployments.
|
||||
runtimeConfig: map[string]string{
|
||||
"extensions/v1beta1/deployments": "false",
|
||||
},
|
||||
expectedAPIConfig: func() *genericapiserver.ResourceConfig {
|
||||
config := master.DefaultAPIResourceConfigSource()
|
||||
config.DisableResources(unversioned.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"})
|
||||
return config
|
||||
},
|
||||
err: false,
|
||||
},
|
||||
{
|
||||
// Enable deployments and disable jobs.
|
||||
runtimeConfig: map[string]string{
|
||||
"extensions/v1beta1/anything": "true",
|
||||
"extensions/v1beta1/jobs": "false",
|
||||
},
|
||||
expectedAPIConfig: func() *genericapiserver.ResourceConfig {
|
||||
config := master.DefaultAPIResourceConfigSource()
|
||||
config.DisableResources(unversioned.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "jobs"})
|
||||
config.EnableResources(unversioned.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "anything"})
|
||||
return config
|
||||
},
|
||||
err: false,
|
||||
},
|
||||
}
|
||||
for _, test := range testCases {
|
||||
s := &options.APIServer{
|
||||
RuntimeConfig: test.runtimeConfig,
|
||||
}
|
||||
actualDisablers, err := parseRuntimeConfig(s)
|
||||
|
||||
if err == nil && test.err {
|
||||
t.Fatalf("expected error for test: %v", test)
|
||||
} else if err != nil && !test.err {
|
||||
t.Fatalf("unexpected error: %s, for test: %v", err, test)
|
||||
}
|
||||
|
||||
expectedConfig := test.expectedAPIConfig()
|
||||
if err == nil && !reflect.DeepEqual(actualDisablers, expectedConfig) {
|
||||
t.Fatalf("%v: unexpected apiResourceDisablers. Actual: %v\n expected: %v", test.runtimeConfig, actualDisablers, expectedConfig)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue