From 2a9a9fa155e92d0bd7f4eb85d843cfa14b830cca Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 27 Feb 2019 17:21:47 +0100 Subject: [PATCH] kube-apiserver: first remove endpoints, then add when ready --- pkg/master/BUILD | 1 + pkg/master/controller.go | 30 ++++++++++++++----- pkg/master/master.go | 2 +- pkg/master/reconcilers/reconcilers.go | 2 +- .../master/synthetic_master_test.go | 12 ++++++-- 5 files changed, 35 insertions(+), 12 deletions(-) diff --git a/pkg/master/BUILD b/pkg/master/BUILD index 30d7eb6fd4..0ba0967310 100644 --- a/pkg/master/BUILD +++ b/pkg/master/BUILD @@ -115,6 +115,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/master/controller.go b/pkg/master/controller.go index 3fdfe1a0a2..3e90b84f69 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -19,6 +19,7 @@ package master import ( "fmt" "net" + "net/http" "time" corev1 "k8s.io/api/core/v1" @@ -31,6 +32,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" utilfeature "k8s.io/apiserver/pkg/util/feature" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" "k8s.io/klog" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/master/reconcilers" @@ -51,6 +53,7 @@ type Controller struct { ServiceClient corev1client.ServicesGetter NamespaceClient corev1client.NamespacesGetter EventClient corev1client.EventsGetter + healthClient rest.Interface ServiceClusterIPRegistry rangeallocation.RangeRegistry ServiceClusterIPInterval time.Duration @@ -80,7 +83,7 @@ type Controller struct { } // NewBootstrapController returns a controller for watching the core capabilities of the master -func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter) *Controller { +func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter, healthClient rest.Interface) *Controller { _, publicServicePort, err := c.GenericConfig.SecureServing.HostPort() if err != nil { klog.Fatalf("failed to get listener address: %v", err) @@ -95,6 +98,7 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega ServiceClient: serviceClient, NamespaceClient: nsClient, EventClient: eventClient, + healthClient: healthClient, EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler, EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval, @@ -138,6 +142,12 @@ func (c *Controller) Start() { return } + // Reconcile during first run removing itself until server is ready. + endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts) + if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil { + klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err) + } + repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry) repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry) @@ -150,10 +160,6 @@ func (c *Controller) Start() { // If we fail to repair node ports apiserver is useless. We should restart and retry. klog.Fatalf("Unable to perform initial service nodePort check: %v", err) } - // Service definition is reconciled during first run to correct port and type per expectations. - if err := c.UpdateKubernetesService(true); err != nil { - klog.Errorf("Unable to perform initial Kubernetes service initialization: %v", err) - } c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil) c.runner.Start() @@ -168,7 +174,8 @@ func (c *Controller) Stop() { go func() { defer close(finishedReconciling) klog.Infof("Shutting down kubernetes service endpoint reconciler") - if err := c.EndpointReconciler.StopReconciling(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil { + c.EndpointReconciler.StopReconciling() + if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil { klog.Error(err) } }() @@ -178,7 +185,7 @@ func (c *Controller) Stop() { // done case <-time.After(2 * c.EndpointInterval): // don't block server shutdown forever if we can't reach etcd to remove ourselves - klog.Warning("StopReconciling() timed out") + klog.Warning("RemoveEndpoints() timed out") } } @@ -196,7 +203,14 @@ func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) { // RunKubernetesService periodically updates the kubernetes service func (c *Controller) RunKubernetesService(ch chan struct{}) { - wait.Until(func() { + // wait until process is ready + wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) { + var code int + c.healthClient.Get().AbsPath("/healthz").Do().StatusCode(&code) + return code == http.StatusOK, nil + }, ch) + + wait.NonSlidingUntil(func() { // Service definition is not reconciled after first // run, ports and type will be corrected only during // start. diff --git a/pkg/master/master.go b/pkg/master/master.go index bc6b2e81c3..8270ef3542 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -375,7 +375,7 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic. controllerName := "bootstrap-controller" coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) - bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient) + bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient()) m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook) m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook) diff --git a/pkg/master/reconcilers/reconcilers.go b/pkg/master/reconcilers/reconcilers.go index b348c8b286..8f28003876 100644 --- a/pkg/master/reconcilers/reconcilers.go +++ b/pkg/master/reconcilers/reconcilers.go @@ -38,7 +38,7 @@ type EndpointReconciler interface { ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error // RemoveEndpoints removes this apiserver's lease. RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error - // StopReonciling turns any later ReconcileEndpoints call into a noop. + // StopReconciling turns any later ReconcileEndpoints call into a noop. StopReconciling() } diff --git a/test/integration/master/synthetic_master_test.go b/test/integration/master/synthetic_master_test.go index d40b98e3ca..a6b627e397 100644 --- a/test/integration/master/synthetic_master_test.go +++ b/test/integration/master/synthetic_master_test.go @@ -103,8 +103,16 @@ func TestKubernetesService(t *testing.T) { _, _, closeFn := framework.RunAMaster(config) defer closeFn() coreClient := clientset.NewForConfigOrDie(config.GenericConfig.LoopbackClientConfig) - if _, err := coreClient.Core().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}); err != nil { - t.Fatalf("Expected kubernetes service to exists, got: %v", err) + err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) { + if _, err := coreClient.Core().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}); err != nil && errors.IsNotFound(err) { + return false, nil + } else if err != nil { + return false, err + } + return true, nil + }) + if err != nil { + t.Fatalf("Expected kubernetes service to exist, got: %v", err) } }