add aggregation integration test

pull/6/head
deads2k 2017-02-23 16:34:09 -05:00
parent 49d1814b3a
commit 5cfe26dece
13 changed files with 477 additions and 238 deletions

View File

@ -65,6 +65,7 @@ go_library(
"//plugin/cmd/kube-scheduler/app:go_default_library",
"//plugin/cmd/kube-scheduler/app/options:go_default_library",
"//vendor:github.com/spf13/pflag",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/server/healthz",
"//vendor:k8s.io/apiserver/pkg/util/flag",
"//vendor:k8s.io/apiserver/pkg/util/logs",

View File

@ -19,6 +19,7 @@ package main
import (
"os"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kube-aggregator/pkg/cmd/server"
)
@ -39,7 +40,7 @@ func NewKubeAggregator() *Server {
if err := o.Validate(args); err != nil {
return err
}
if err := o.RunAggregator(); err != nil {
if err := o.RunAggregator(wait.NeverStop); err != nil {
return err
}
return nil

View File

@ -85,26 +85,47 @@ cluster's shared state through which all other components interact.`,
// Run runs the specified APIServer. This should never exit.
func Run(s *options.ServerRunOptions) error {
config, sharedInformers, err := BuildMasterConfig(s)
if err != nil {
return err
}
return RunServer(config, sharedInformers, wait.NeverStop)
}
// RunServer uses the provided config and shared informers to run the apiserver. It does not return.
func RunServer(config *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) error {
m, err := config.Complete().New()
if err != nil {
return err
}
sharedInformers.Start(stopCh)
return m.GenericAPIServer.PrepareRun().Run(stopCh)
}
// BuildMasterConfig creates all the resources for running the API server, but runs none of them
func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, error) {
// set defaults
if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing, s.InsecureServing); err != nil {
return err
return nil, nil, err
}
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
if err != nil {
return fmt.Errorf("error determining service IP ranges: %v", err)
return nil, nil, fmt.Errorf("error determining service IP ranges: %v", err)
}
if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), apiServerServiceIP); err != nil {
return fmt.Errorf("error creating self-signed certificates: %v", err)
return nil, nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
if err := s.CloudProvider.DefaultExternalHost(s.GenericServerRunOptions); err != nil {
return fmt.Errorf("error setting the external host value: %v", err)
return nil, nil, fmt.Errorf("error setting the external host value: %v", err)
}
s.Authentication.ApplyAuthorization(s.Authorization)
// validate options
if errs := s.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
return nil, nil, utilerrors.NewAggregate(errs)
}
// create config from options
@ -112,22 +133,22 @@ func Run(s *options.ServerRunOptions) error {
WithSerializer(api.Codecs)
if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil {
return err
return nil, nil, err
}
if err := s.InsecureServing.ApplyTo(genericConfig); err != nil {
return err
return nil, nil, err
}
if err := s.SecureServing.ApplyTo(genericConfig); err != nil {
return err
return nil, nil, err
}
if err := s.Authentication.ApplyTo(genericConfig); err != nil {
return err
return nil, nil, err
}
if err := s.Audit.ApplyTo(genericConfig); err != nil {
return err
return nil, nil, err
}
if err := s.Features.ApplyTo(genericConfig); err != nil {
return err
return nil, nil, err
}
// Use protobufs for self-communication.
@ -155,7 +176,7 @@ func Run(s *options.ServerRunOptions) error {
var installSSHKey tunneler.InstallSSHKey
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile)
if err != nil {
return fmt.Errorf("cloud provider could not be initialized: %v", err)
return nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err)
}
if cloud != nil {
if instances, supported := cloud.Instances(); supported {
@ -163,10 +184,10 @@ func Run(s *options.ServerRunOptions) error {
}
}
if s.KubeletConfig.Port == 0 {
return fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
return nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified")
}
if s.KubeletConfig.ReadOnlyPort == 0 {
return fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
return nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified")
}
// Set up the nodeTunneler
// TODO(cjcullen): If we want this to handle per-kubelet ports or other
@ -211,7 +232,7 @@ func Run(s *options.ServerRunOptions) error {
storageGroupsToEncodingVersion, err := s.StorageSerialization.StorageGroupsToEncodingVersion()
if err != nil {
return fmt.Errorf("error generating storage version map: %s", err)
return nil, nil, fmt.Errorf("error generating storage version map: %s", err)
}
storageFactory, err := kubeapiserver.NewStorageFactory(
s.Etcd.StorageConfig, s.Etcd.DefaultStorageMediaType, api.Codecs,
@ -220,7 +241,7 @@ func Run(s *options.ServerRunOptions) error {
[]schema.GroupVersionResource{batch.Resource("cronjobs").WithVersion("v2alpha1")},
master.DefaultAPIResourceConfigSource(), s.APIEnablement.RuntimeConfig)
if err != nil {
return fmt.Errorf("error in initializing storage factory: %s", err)
return nil, nil, fmt.Errorf("error in initializing storage factory: %s", err)
}
for _, override := range s.Etcd.EtcdServersOverrides {
tokens := strings.Split(override, "#")
@ -257,7 +278,7 @@ func Run(s *options.ServerRunOptions) error {
// go directly to etcd to avoid recursive auth insanity
storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts"))
if err != nil {
return fmt.Errorf("unable to get serviceaccounts storage: %v", err)
return nil, nil, fmt.Errorf("unable to get serviceaccounts storage: %v", err)
}
authenticatorConfig.ServiceAccountTokenGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets")))
}
@ -266,7 +287,7 @@ func Run(s *options.ServerRunOptions) error {
if err != nil {
kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS")
if len(kubeAPIVersions) == 0 {
return fmt.Errorf("failed to create clientset: %v", err)
return nil, nil, fmt.Errorf("failed to create clientset: %v", err)
}
// KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API
@ -289,24 +310,24 @@ func Run(s *options.ServerRunOptions) error {
apiAuthenticator, securityDefinitions, err := authenticatorConfig.New()
if err != nil {
return fmt.Errorf("invalid authentication config: %v", err)
return nil, nil, fmt.Errorf("invalid authentication config: %v", err)
}
authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers)
apiAuthorizer, err := authorizationConfig.New()
if err != nil {
return fmt.Errorf("invalid Authorization Config: %v", err)
return nil, nil, fmt.Errorf("invalid Authorization Config: %v", err)
}
admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",")
pluginInitializer := kubeadmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer)
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile)
if err != nil {
return fmt.Errorf("failed to read plugin config: %v", err)
return nil, nil, fmt.Errorf("failed to read plugin config: %v", err)
}
admissionController, err := admission.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer)
if err != nil {
return fmt.Errorf("failed to initialize plugins: %v", err)
return nil, nil, fmt.Errorf("failed to initialize plugins: %v", err)
}
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
@ -331,16 +352,16 @@ func Run(s *options.ServerRunOptions) error {
)
if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
return err
return nil, nil, err
}
clientCA, err := readCAorNil(s.Authentication.ClientCert.ClientCA)
if err != nil {
return err
return nil, nil, err
}
requestHeaderProxyCA, err := readCAorNil(s.Authentication.RequestHeader.ClientCAFile)
if err != nil {
return err
return nil, nil, err
}
config := &master.Config{
GenericConfig: genericConfig,
@ -381,14 +402,7 @@ func Run(s *options.ServerRunOptions) error {
cachesize.SetWatchCacheSizes(s.GenericServerRunOptions.WatchCacheSizes)
}
m, err := config.Complete().New()
if err != nil {
return err
}
sharedInformers.Start(wait.NeverStop)
m.GenericAPIServer.PrepareRun().Run(wait.NeverStop)
return nil
return config, sharedInformers, nil
}
func readCAorNil(file string) ([]byte, error) {

View File

@ -213,8 +213,7 @@ func Run(s *options.ServerRunOptions) error {
installAutoscalingAPIs(m, genericConfig.RESTOptionsGetter)
sharedInformers.Start(wait.NeverStop)
m.PrepareRun().Run(wait.NeverStop)
return nil
return m.PrepareRun().Run(wait.NeverStop)
}
// PostProcessSpec adds removed definitions for backward compatibility

View File

@ -189,16 +189,16 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// Run spawns the http servers (secure and insecure). It only returns if stopCh is closed
// or one of the ports cannot be listened on initially.
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) {
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
if s.SecureServingInfo != nil && s.Handler != nil {
if err := s.serveSecurely(stopCh); err != nil {
glog.Fatal(err)
return err
}
}
if s.InsecureServingInfo != nil && s.InsecureHandler != nil {
if err := s.serveInsecurely(stopCh); err != nil {
glog.Fatal(err)
return err
}
}
@ -210,6 +210,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) {
}
<-stopCh
return nil
}
// EffectiveSecurePort returns the secure port we bound to.

View File

@ -448,114 +448,121 @@ NextTest:
}
stopCh := make(chan struct{})
func() {
defer close(stopCh)
// launch server
config := setUp(t)
// launch server
config := setUp(t)
v := fakeVersion()
config.Version = &v
v := fakeVersion()
config.Version = &v
config.EnableIndex = true
secureOptions := &SecureServingOptions{
ServingOptions: ServingOptions{
BindAddress: net.ParseIP("127.0.0.1"),
BindPort: 6443,
},
ServerCert: GeneratableKeyCert{
CertKey: CertKey{
CertFile: serverCertBundleFile,
KeyFile: serverKeyFile,
config.EnableIndex = true
secureOptions := &SecureServingOptions{
ServingOptions: ServingOptions{
BindAddress: net.ParseIP("127.0.0.1"),
BindPort: 6443,
},
},
SNICertKeys: namedCertKeys,
}
config.LoopbackClientConfig = &restclient.Config{}
if err := secureOptions.ApplyTo(&config); err != nil {
t.Errorf("%q - failed applying the SecureServingOptions: %v", title, err)
continue NextTest
}
config.InsecureServingInfo = nil
s, err := config.Complete().New()
if err != nil {
t.Errorf("%q - failed creating the server: %v", title, err)
continue NextTest
}
// patch in a 0-port to enable auto port allocation
s.SecureServingInfo.BindAddress = "127.0.0.1:0"
// add poststart hook to know when the server is up.
startedCh := make(chan struct{})
s.AddPostStartHook("test-notifier", func(context PostStartHookContext) error {
close(startedCh)
return nil
})
preparedServer := s.PrepareRun()
go preparedServer.Run(stopCh)
// load ca certificates into a pool
roots := x509.NewCertPool()
for _, caCert := range caCerts {
roots.AddCert(caCert)
}
<-startedCh
effectiveSecurePort := fmt.Sprintf("%d", preparedServer.EffectiveSecurePort())
// try to dial
addr := fmt.Sprintf("localhost:%s", effectiveSecurePort)
t.Logf("Dialing %s as %q", addr, test.ServerName)
conn, err := tls.Dial("tcp", addr, &tls.Config{
RootCAs: roots,
ServerName: test.ServerName, // used for SNI in the client HELLO packet
})
if err != nil {
t.Errorf("%q - failed to connect: %v", title, err)
continue NextTest
}
// check returned server certificate
sig := x509CertSignature(conn.ConnectionState().PeerCertificates[0])
gotCertIndex, found := signatures[sig]
if !found {
t.Errorf("%q - unknown signature returned from server: %s", title, sig)
}
if gotCertIndex != test.ExpectedCertIndex {
t.Errorf("%q - expected cert index %d, got cert index %d", title, test.ExpectedCertIndex, gotCertIndex)
}
conn.Close()
// check that the loopback client can connect
host := "127.0.0.1"
if len(test.LoopbackClientBindAddressOverride) != 0 {
host = test.LoopbackClientBindAddressOverride
}
s.LoopbackClientConfig.Host = net.JoinHostPort(host, effectiveSecurePort)
if test.ExpectLoopbackClientError {
if err == nil {
t.Errorf("%q - expected error creating loopback client config", title)
ServerCert: GeneratableKeyCert{
CertKey: CertKey{
CertFile: serverCertBundleFile,
KeyFile: serverKeyFile,
},
},
SNICertKeys: namedCertKeys,
}
continue NextTest
}
if err != nil {
t.Errorf("%q - failed creating loopback client config: %v", title, err)
continue NextTest
}
client, err := discovery.NewDiscoveryClientForConfig(s.LoopbackClientConfig)
if err != nil {
t.Errorf("%q - failed to create loopback client: %v", title, err)
continue NextTest
}
got, err := client.ServerVersion()
if err != nil {
t.Errorf("%q - failed to connect with loopback client: %v", title, err)
continue NextTest
}
if expected := &v; !reflect.DeepEqual(got, expected) {
t.Errorf("%q - loopback client didn't get correct version info: expected=%v got=%v", title, expected, got)
}
config.LoopbackClientConfig = &restclient.Config{}
if err := secureOptions.ApplyTo(&config); err != nil {
t.Errorf("%q - failed applying the SecureServingOptions: %v", title, err)
return
}
config.InsecureServingInfo = nil
s, err := config.Complete().New()
if err != nil {
t.Errorf("%q - failed creating the server: %v", title, err)
return
}
// patch in a 0-port to enable auto port allocation
s.SecureServingInfo.BindAddress = "127.0.0.1:0"
// add poststart hook to know when the server is up.
startedCh := make(chan struct{})
s.AddPostStartHook("test-notifier", func(context PostStartHookContext) error {
close(startedCh)
return nil
})
preparedServer := s.PrepareRun()
go func() {
if err := preparedServer.Run(stopCh); err != nil {
t.Fatal(err)
}
}()
// load ca certificates into a pool
roots := x509.NewCertPool()
for _, caCert := range caCerts {
roots.AddCert(caCert)
}
<-startedCh
effectiveSecurePort := fmt.Sprintf("%d", preparedServer.EffectiveSecurePort())
// try to dial
addr := fmt.Sprintf("localhost:%s", effectiveSecurePort)
t.Logf("Dialing %s as %q", addr, test.ServerName)
conn, err := tls.Dial("tcp", addr, &tls.Config{
RootCAs: roots,
ServerName: test.ServerName, // used for SNI in the client HELLO packet
})
if err != nil {
t.Errorf("%q - failed to connect: %v", title, err)
return
}
// check returned server certificate
sig := x509CertSignature(conn.ConnectionState().PeerCertificates[0])
gotCertIndex, found := signatures[sig]
if !found {
t.Errorf("%q - unknown signature returned from server: %s", title, sig)
}
if gotCertIndex != test.ExpectedCertIndex {
t.Errorf("%q - expected cert index %d, got cert index %d", title, test.ExpectedCertIndex, gotCertIndex)
}
conn.Close()
// check that the loopback client can connect
host := "127.0.0.1"
if len(test.LoopbackClientBindAddressOverride) != 0 {
host = test.LoopbackClientBindAddressOverride
}
s.LoopbackClientConfig.Host = net.JoinHostPort(host, effectiveSecurePort)
if test.ExpectLoopbackClientError {
if err == nil {
t.Errorf("%q - expected error creating loopback client config", title)
}
return
}
if err != nil {
t.Errorf("%q - failed creating loopback client config: %v", title, err)
return
}
client, err := discovery.NewDiscoveryClientForConfig(s.LoopbackClientConfig)
if err != nil {
t.Errorf("%q - failed to create loopback client: %v", title, err)
return
}
got, err := client.ServerVersion()
if err != nil {
t.Errorf("%q - failed to connect with loopback client: %v", title, err)
return
}
if expected := &v; !reflect.DeepEqual(got, expected) {
t.Errorf("%q - loopback client didn't get correct version info: expected=%v got=%v", title, expected, got)
}
}()
}
}

View File

@ -21,6 +21,7 @@ import (
"os"
"runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/logs"
"k8s.io/kube-aggregator/pkg/cmd/server"
@ -40,7 +41,7 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
}
cmd := server.NewCommandStartAggregator(os.Stdout, os.Stderr)
cmd := server.NewCommandStartAggregator(os.Stdout, os.Stderr, wait.NeverStop)
cmd.Flags().AddGoFlagSet(flag.CommandLine)
if err := cmd.Execute(); err != nil {
panic(err)

View File

@ -22,7 +22,6 @@ import (
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
@ -106,7 +105,7 @@ func (c *Config) SkipComplete() completedConfig {
}
// New returns a new instance of APIAggregator from the given config.
func (c completedConfig) New() (*APIAggregator, error) {
func (c completedConfig) New(stopCh <-chan struct{}) (*APIAggregator, error) {
informerFactory := informers.NewSharedInformerFactory(
internalclientset.NewForConfigOrDie(c.Config.GenericConfig.LoopbackClientConfig),
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
@ -154,12 +153,12 @@ func (c completedConfig) New() (*APIAggregator, error) {
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), s)
s.GenericAPIServer.AddPostStartHook("start-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(wait.NeverStop)
kubeInformers.Start(wait.NeverStop)
informerFactory.Start(stopCh)
kubeInformers.Start(stopCh)
return nil
})
s.GenericAPIServer.AddPostStartHook("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
apiserviceRegistrationController.Run(wait.NeverStop)
apiserviceRegistrationController.Run(stopCh)
return nil
})

View File

@ -25,7 +25,6 @@ import (
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/filters"
genericoptions "k8s.io/apiserver/pkg/server/options"
@ -56,7 +55,7 @@ type AggregatorOptions struct {
}
// NewCommandStartAggregator provides a CLI handler for 'start master' command
func NewCommandStartAggregator(out, err io.Writer) *cobra.Command {
func NewCommandStartAggregator(out, err io.Writer, stopCh <-chan struct{}) *cobra.Command {
o := NewDefaultOptions(out, err)
cmd := &cobra.Command{
@ -69,7 +68,7 @@ func NewCommandStartAggregator(out, err io.Writer) *cobra.Command {
if err := o.Validate(args); err != nil {
return err
}
if err := o.RunAggregator(); err != nil {
if err := o.RunAggregator(stopCh); err != nil {
return err
}
return nil
@ -110,7 +109,7 @@ func (o *AggregatorOptions) Complete() error {
return nil
}
func (o AggregatorOptions) RunAggregator() error {
func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error {
// TODO have a "real" external address
if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost"); err != nil {
return fmt.Errorf("error creating self-signed certificates: %v", err)
@ -161,11 +160,9 @@ func (o AggregatorOptions) RunAggregator() error {
return err
}
server, err := config.Complete().New()
server, err := config.Complete().New(stopCh)
if err != nil {
return err
}
server.GenericAPIServer.PrepareRun().Run(wait.NeverStop)
return nil
return server.GenericAPIServer.PrepareRun().Run(stopCh)
}

View File

@ -19,6 +19,7 @@ package server
import (
"fmt"
"io"
"net"
"github.com/spf13/cobra"
@ -85,7 +86,7 @@ func (o *WardleServerOptions) Complete() error {
func (o WardleServerOptions) Config() (*apiserver.Config, error) {
// TODO have a "real" external address
if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost"); err != nil {
if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", net.ParseIP("127.0.0.1")); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
@ -110,7 +111,5 @@ func (o WardleServerOptions) RunWardleServer(stopCh <-chan struct{}) error {
if err != nil {
return err
}
server.GenericAPIServer.PrepareRun().Run(stopCh)
return nil
return server.GenericAPIServer.PrepareRun().Run(stopCh)
}

View File

@ -12,13 +12,20 @@ go_test(
srcs = ["apiserver_test.go"],
tags = ["automanaged"],
deps = [
"//cmd/kube-apiserver/app:go_default_library",
"//cmd/kube-apiserver/app/options:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/kubernetes",
"//vendor:k8s.io/client-go/rest",
"//vendor:k8s.io/client-go/tools/clientcmd",
"//vendor:k8s.io/client-go/tools/clientcmd/api",
"//vendor:k8s.io/client-go/util/cert",
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration/v1alpha1",
"//vendor:k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset",
"//vendor:k8s.io/kube-aggregator/pkg/cmd/server",
"//vendor:k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1",
"//vendor:k8s.io/sample-apiserver/pkg/cmd/server",
],

View File

@ -17,29 +17,37 @@ limitations under the License.
package apiserver
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net"
"os"
"path"
"strconv"
"strings"
"testing"
"time"
"github.com/golang/glog"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/util/cert"
apiregistrationv1alpha1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1alpha1"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
kubeaggregatorserver "k8s.io/kube-aggregator/pkg/cmd/server"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
"k8s.io/sample-apiserver/pkg/cmd/server"
sampleserver "k8s.io/sample-apiserver/pkg/cmd/server"
)
const securePort = "6444"
var groupVersion = v1alpha1.SchemeGroupVersion
var groupVersionForDiscovery = metav1.GroupVersionForDiscovery{
@ -47,41 +55,277 @@ var groupVersionForDiscovery = metav1.GroupVersionForDiscovery{
Version: groupVersion.Version,
}
func TestRunServer(t *testing.T) {
masterConfig := framework.NewIntegrationTestMasterConfig()
_, s := framework.RunAMaster(masterConfig)
defer s.Close()
func localPort() (int, error) {
l, err := net.Listen("tcp", ":0")
if err != nil {
return 0, err
}
defer l.Close()
addr := strings.Split(l.Addr().String(), ":")
port, err := strconv.Atoi(addr[len(addr)-1])
if err != nil {
return 0, err
}
return port, nil
}
adminKubeConfig := createKubeConfig(masterConfig.GenericConfig.LoopbackClientConfig)
func TestAggregatedAPIServer(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
kubePort, err := localPort()
if err != nil {
t.Fatal(err)
}
certDir, _ := ioutil.TempDir("", "test-integration-apiserver")
defer os.RemoveAll(certDir)
_, defaultServiceClusterIPRange, _ := net.ParseCIDR("10.0.0.0/24")
proxySigningKey, err := cert.NewPrivateKey()
if err != nil {
t.Fatal(err)
}
proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
if err != nil {
t.Fatal(err)
}
proxyCACertFile, _ := ioutil.TempFile(certDir, "proxy-ca.crt")
if err := ioutil.WriteFile(proxyCACertFile.Name(), cert.EncodeCertPEM(proxySigningCert), 0644); err != nil {
t.Fatal(err)
}
kubeAPIServerOptions := options.NewServerRunOptions()
kubeAPIServerOptions.SecureServing.ServingOptions.BindAddress = net.ParseIP("127.0.0.1")
kubeAPIServerOptions.SecureServing.ServingOptions.BindPort = kubePort
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
kubeAPIServerOptions.InsecureServing.BindPort = 0
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURLFromEnv()}
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"}
kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"}
kubeAPIServerOptions.Authentication.RequestHeader.ExtraHeaderPrefixes = []string{"X-Remote-Extra-"}
kubeAPIServerOptions.Authentication.RequestHeader.AllowedNames = []string{"kube-aggregator"}
kubeAPIServerOptions.Authentication.RequestHeader.ClientCAFile = proxyCACertFile.Name()
kubeAPIServerOptions.Authorization.Mode = "RBAC"
config, sharedInformers, err := app.BuildMasterConfig(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}
go func() {
for {
if err := app.RunServer(config, sharedInformers, stopCh); err != nil {
t.Log(err)
}
time.Sleep(100 * time.Millisecond)
}
}()
// just use json because everyone speaks it
config.GenericConfig.LoopbackClientConfig.ContentType = ""
config.GenericConfig.LoopbackClientConfig.AcceptContentTypes = ""
kubeClient := client.NewForConfigOrDie(config.GenericConfig.LoopbackClientConfig)
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if _, err := kubeClient.Discovery().ServerVersion(); err != nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
// write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config
adminKubeConfig := createKubeConfig(config.GenericConfig.LoopbackClientConfig)
kubeconfigFile, _ := ioutil.TempFile("", "")
defer os.Remove(kubeconfigFile.Name())
clientcmd.WriteToFile(*adminKubeConfig, kubeconfigFile.Name())
// Avoid default cert-dir of /var/run/kubernetes to allow this to run on darwin
certDir, _ := ioutil.TempDir("", "test-integration-apiserver")
defer os.Remove(certDir)
stopCh := make(chan struct{})
defer close(stopCh)
cmd := server.NewCommandStartWardleServer(os.Stdout, os.Stderr, stopCh)
cmd.SetArgs([]string{
"--secure-port", securePort,
"--requestheader-username-headers", "",
// start the wardle server to prove we can aggregate it
wardlePort, err := localPort()
if err != nil {
t.Fatal(err)
}
wardleCertDir, _ := ioutil.TempDir("", "test-integration-wardle-server")
defer os.RemoveAll(wardleCertDir)
wardleCmd := sampleserver.NewCommandStartWardleServer(os.Stdout, os.Stderr, stopCh)
wardleCmd.SetArgs([]string{
"--bind-address", "127.0.0.1",
"--secure-port", strconv.Itoa(wardlePort),
"--requestheader-username-headers=X-Remote-User",
"--requestheader-group-headers=X-Remote-Group",
"--requestheader-extra-headers-prefix=X-Remote-Extra-",
"--requestheader-client-ca-file=" + proxyCACertFile.Name(),
"--requestheader-allowed-names=kube-aggregator",
"--authentication-kubeconfig", kubeconfigFile.Name(),
"--authorization-kubeconfig", kubeconfigFile.Name(),
"--etcd-servers", framework.GetEtcdURLFromEnv(),
"--cert-dir", certDir,
"--cert-dir", wardleCertDir,
})
go cmd.Execute()
go func() {
for {
if err := wardleCmd.Execute(); err != nil {
t.Log(err)
}
time.Sleep(100 * time.Millisecond)
}
}()
serverLocation := fmt.Sprintf("https://localhost:%s", securePort)
if err := waitForApiserverUp(serverLocation); err != nil {
t.Fatalf("%v", err)
wardleClientConfig := rest.AnonymousClientConfig(config.GenericConfig.LoopbackClientConfig)
wardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", wardlePort)
wardleClientConfig.CAFile = path.Join(wardleCertDir, "apiserver.crt")
wardleClientConfig.CAData = nil
wardleClientConfig.ServerName = ""
wardleClientConfig.BearerToken = config.GenericConfig.LoopbackClientConfig.BearerToken
var wardleClient client.Interface
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
wardleClient, err = client.NewForConfig(wardleClientConfig)
if err != nil {
// this happens if we race the API server for writing the cert
t.Log(err)
return false, nil
}
if _, err := wardleClient.Discovery().ServerVersion(); err != nil {
t.Log(err)
return false, nil
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
testAPIGroupList(t, serverLocation)
testAPIGroup(t, serverLocation)
testAPIResourceList(t, serverLocation)
// start the aggregator
aggregatorPort, err := localPort()
if err != nil {
t.Fatal(err)
}
aggregatorCertDir, _ := ioutil.TempDir("", "test-integration-aggregator")
defer os.RemoveAll(aggregatorCertDir)
proxyClientKey, err := cert.NewPrivateKey()
if err != nil {
t.Fatal(err)
}
proxyClientCert, err := cert.NewSignedCert(
cert.Config{
CommonName: "kube-aggregator",
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
},
proxyClientKey, proxySigningCert, proxySigningKey,
)
proxyClientCertFile, _ := ioutil.TempFile(aggregatorCertDir, "proxy-client.crt")
proxyClientKeyFile, _ := ioutil.TempFile(aggregatorCertDir, "proxy-client.key")
if err := ioutil.WriteFile(proxyClientCertFile.Name(), cert.EncodeCertPEM(proxyClientCert), 0600); err != nil {
t.Fatal(err)
}
if err := ioutil.WriteFile(proxyClientKeyFile.Name(), cert.EncodePrivateKeyPEM(proxyClientKey), 0644); err != nil {
t.Fatal(err)
}
aggregatorCmd := kubeaggregatorserver.NewCommandStartAggregator(os.Stdout, os.Stderr, stopCh)
aggregatorCmd.SetArgs([]string{
"--bind-address", "127.0.0.1",
"--secure-port", strconv.Itoa(aggregatorPort),
"--requestheader-username-headers", "",
"--proxy-client-cert-file", proxyClientCertFile.Name(),
"--proxy-client-key-file", proxyClientKeyFile.Name(),
"--core-kubeconfig", kubeconfigFile.Name(),
"--authentication-kubeconfig", kubeconfigFile.Name(),
"--authorization-kubeconfig", kubeconfigFile.Name(),
"--etcd-servers", framework.GetEtcdURLFromEnv(),
"--cert-dir", aggregatorCertDir,
})
go func() {
for {
if err := aggregatorCmd.Execute(); err != nil {
t.Log(err)
}
time.Sleep(100 * time.Millisecond)
}
}()
aggregatorClientConfig := rest.AnonymousClientConfig(config.GenericConfig.LoopbackClientConfig)
aggregatorClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", aggregatorPort)
aggregatorClientConfig.CAFile = path.Join(aggregatorCertDir, "apiserver.crt")
aggregatorClientConfig.CAData = nil
aggregatorClientConfig.ServerName = ""
aggregatorClientConfig.BearerToken = config.GenericConfig.LoopbackClientConfig.BearerToken
var aggregatorDiscoveryClient client.Interface
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
aggregatorDiscoveryClient, err = client.NewForConfig(aggregatorClientConfig)
if err != nil {
// this happens if we race the API server for writing the cert
return false, nil
}
if _, err := aggregatorDiscoveryClient.Discovery().ServerVersion(); err != nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
// now we're finally ready to test. These are what's run by defautl now
testAPIGroupList(t, wardleClient.Discovery().RESTClient())
testAPIGroup(t, wardleClient.Discovery().RESTClient())
testAPIResourceList(t, wardleClient.Discovery().RESTClient())
wardleCA, err := ioutil.ReadFile(wardleClientConfig.CAFile)
if err != nil {
t.Fatal(err)
}
aggregatorClient := aggregatorclient.NewForConfigOrDie(aggregatorClientConfig)
_, err = aggregatorClient.ApiregistrationV1alpha1().APIServices().Create(&apiregistrationv1alpha1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.k8s.io"},
Spec: apiregistrationv1alpha1.APIServiceSpec{
Service: apiregistrationv1alpha1.ServiceReference{
Namespace: "kube-wardle",
Name: "api",
},
Group: "wardle.k8s.io",
Version: "v1alpha1",
CABundle: wardleCA,
Priority: 200,
},
})
if err != nil {
t.Fatal(err)
}
// this is ugly, but sleep just a little bit so that the watch is probably observed. Since nothing will actually be added to discovery
// (the service is missing), we don't have an external signal.
time.Sleep(100 * time.Millisecond)
if _, err := aggregatorDiscoveryClient.Discovery().ServerResources(); err != nil {
t.Fatal(err)
}
_, err = aggregatorClient.ApiregistrationV1alpha1().APIServices().Create(&apiregistrationv1alpha1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1."},
Spec: apiregistrationv1alpha1.APIServiceSpec{
Service: apiregistrationv1alpha1.ServiceReference{
Namespace: "default",
Name: "kubernetes",
},
Group: "",
Version: "v1",
CABundle: config.GenericConfig.LoopbackClientConfig.CAData,
Priority: 100,
},
})
if err != nil {
t.Fatal(err)
}
// this is ugly, but sleep just a little bit so that the watch is probably observed. Since nothing will actually be added to discovery
// (the service is missing), we don't have an external signal.
time.Sleep(100 * time.Millisecond)
_, err = aggregatorDiscoveryClient.Discovery().ServerResources()
if err != nil && !strings.Contains(err.Error(), "lookup kubernetes.default.svc") {
t.Fatal(err)
}
// TODO figure out how to turn on enough of services and dns to run more
}
func createKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config {
@ -124,46 +368,12 @@ func createKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config {
return config
}
func waitForApiserverUp(serverLocation string) error {
for start := time.Now(); time.Since(start) < 10*time.Second; time.Sleep(5 * time.Second) {
glog.Errorf("Waiting for : %#v", serverLocation)
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{Transport: tr}
_, err := client.Get(serverLocation)
if err == nil {
return nil
}
}
return fmt.Errorf("waiting for apiserver timed out")
func readResponse(client rest.Interface, location string) ([]byte, error) {
return client.Get().AbsPath(location).DoRaw()
}
func readResponse(serverURL string) ([]byte, error) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{Transport: tr}
response, err := client.Get(serverURL)
if err != nil {
glog.Errorf("http get err code : %#v", err)
return nil, fmt.Errorf("Error in fetching %s: %v", serverURL, err)
}
defer response.Body.Close()
glog.Errorf("http get response code : %#v", response.StatusCode)
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status: %d for URL: %s, expected status: %d", response.StatusCode, serverURL, http.StatusOK)
}
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, fmt.Errorf("Error reading response from %s: %v", serverURL, err)
}
return contents, nil
}
func testAPIGroupList(t *testing.T, serverLocation string) {
serverURL := serverLocation + "/apis"
contents, err := readResponse(serverURL)
func testAPIGroupList(t *testing.T, client rest.Interface) {
contents, err := readResponse(client, "/apis")
if err != nil {
t.Fatalf("%v", err)
}
@ -171,7 +381,7 @@ func testAPIGroupList(t *testing.T, serverLocation string) {
var apiGroupList metav1.APIGroupList
err = json.Unmarshal(contents, &apiGroupList)
if err != nil {
t.Fatalf("Error in unmarshalling response from server %s: %v", serverURL, err)
t.Fatalf("Error in unmarshalling response from server %s: %v", "/apis", err)
}
assert.Equal(t, 1, len(apiGroupList.Groups))
assert.Equal(t, groupVersion.Group, apiGroupList.Groups[0].Name)
@ -180,9 +390,8 @@ func testAPIGroupList(t *testing.T, serverLocation string) {
assert.Equal(t, groupVersionForDiscovery, apiGroupList.Groups[0].PreferredVersion)
}
func testAPIGroup(t *testing.T, serverLocation string) {
serverURL := serverLocation + "/apis/wardle.k8s.io"
contents, err := readResponse(serverURL)
func testAPIGroup(t *testing.T, client rest.Interface) {
contents, err := readResponse(client, "/apis/wardle.k8s.io")
if err != nil {
t.Fatalf("%v", err)
}
@ -190,7 +399,7 @@ func testAPIGroup(t *testing.T, serverLocation string) {
var apiGroup metav1.APIGroup
err = json.Unmarshal(contents, &apiGroup)
if err != nil {
t.Fatalf("Error in unmarshalling response from server %s: %v", serverURL, err)
t.Fatalf("Error in unmarshalling response from server %s: %v", "/apis/wardle.k8s.io", err)
}
assert.Equal(t, groupVersion.Group, apiGroup.Name)
assert.Equal(t, 1, len(apiGroup.Versions))
@ -199,9 +408,8 @@ func testAPIGroup(t *testing.T, serverLocation string) {
assert.Equal(t, apiGroup.PreferredVersion, apiGroup.Versions[0])
}
func testAPIResourceList(t *testing.T, serverLocation string) {
serverURL := serverLocation + "/apis/wardle.k8s.io/v1alpha1"
contents, err := readResponse(serverURL)
func testAPIResourceList(t *testing.T, client rest.Interface) {
contents, err := readResponse(client, "/apis/wardle.k8s.io/v1alpha1")
if err != nil {
t.Fatalf("%v", err)
}
@ -209,10 +417,15 @@ func testAPIResourceList(t *testing.T, serverLocation string) {
var apiResourceList metav1.APIResourceList
err = json.Unmarshal(contents, &apiResourceList)
if err != nil {
t.Fatalf("Error in unmarshalling response from server %s: %v", serverURL, err)
t.Fatalf("Error in unmarshalling response from server %s: %v", "/apis/wardle.k8s.io/v1alpha1", err)
}
assert.Equal(t, groupVersion.String(), apiResourceList.GroupVersion)
assert.Equal(t, 1, len(apiResourceList.APIResources))
assert.Equal(t, "flunders", apiResourceList.APIResources[0].Name)
assert.True(t, apiResourceList.APIResources[0].Namespaced)
}
const (
policyCachePollInterval = 100 * time.Millisecond
policyCachePollTimeout = 5 * time.Second
)

2
vendor/BUILD vendored
View File

@ -15811,6 +15811,7 @@ go_library(
srcs = ["k8s.io/kube-aggregator/main.go"],
tags = ["automanaged"],
deps = [
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/util/logs",
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration/install",
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration/validation",
@ -16270,7 +16271,6 @@ go_library(
"//vendor:github.com/spf13/cobra",
"//vendor:github.com/spf13/pflag",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/server",
"//vendor:k8s.io/apiserver/pkg/server/filters",
"//vendor:k8s.io/apiserver/pkg/server/options",