kube-apiserver: first remove endpoints, then add when ready

pull/564/head
Dr. Stefan Schimanski 2019-02-27 17:21:47 +01:00
parent 14f2a335dc
commit 2a9a9fa155
5 changed files with 35 additions and 12 deletions

View File

@ -115,6 +115,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],

View File

@ -19,6 +19,7 @@ package master
import (
"fmt"
"net"
"net/http"
"time"
corev1 "k8s.io/api/core/v1"
@ -31,6 +32,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/master/reconcilers"
@ -51,6 +53,7 @@ type Controller struct {
ServiceClient corev1client.ServicesGetter
NamespaceClient corev1client.NamespacesGetter
EventClient corev1client.EventsGetter
healthClient rest.Interface
ServiceClusterIPRegistry rangeallocation.RangeRegistry
ServiceClusterIPInterval time.Duration
@ -80,7 +83,7 @@ type Controller struct {
}
// NewBootstrapController returns a controller for watching the core capabilities of the master
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter) *Controller {
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter, healthClient rest.Interface) *Controller {
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
if err != nil {
klog.Fatalf("failed to get listener address: %v", err)
@ -95,6 +98,7 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega
ServiceClient: serviceClient,
NamespaceClient: nsClient,
EventClient: eventClient,
healthClient: healthClient,
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
@ -138,6 +142,12 @@ func (c *Controller) Start() {
return
}
// Reconcile during first run removing itself until server is ready.
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
}
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
@ -150,10 +160,6 @@ func (c *Controller) Start() {
// If we fail to repair node ports apiserver is useless. We should restart and retry.
klog.Fatalf("Unable to perform initial service nodePort check: %v", err)
}
// Service definition is reconciled during first run to correct port and type per expectations.
if err := c.UpdateKubernetesService(true); err != nil {
klog.Errorf("Unable to perform initial Kubernetes service initialization: %v", err)
}
c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
c.runner.Start()
@ -168,7 +174,8 @@ func (c *Controller) Stop() {
go func() {
defer close(finishedReconciling)
klog.Infof("Shutting down kubernetes service endpoint reconciler")
if err := c.EndpointReconciler.StopReconciling(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
c.EndpointReconciler.StopReconciling()
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
klog.Error(err)
}
}()
@ -178,7 +185,7 @@ func (c *Controller) Stop() {
// done
case <-time.After(2 * c.EndpointInterval):
// don't block server shutdown forever if we can't reach etcd to remove ourselves
klog.Warning("StopReconciling() timed out")
klog.Warning("RemoveEndpoints() timed out")
}
}
@ -196,7 +203,14 @@ func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
// RunKubernetesService periodically updates the kubernetes service
func (c *Controller) RunKubernetesService(ch chan struct{}) {
wait.Until(func() {
// wait until process is ready
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
var code int
c.healthClient.Get().AbsPath("/healthz").Do().StatusCode(&code)
return code == http.StatusOK, nil
}, ch)
wait.NonSlidingUntil(func() {
// Service definition is not reconciled after first
// run, ports and type will be corrected only during
// start.

View File

@ -375,7 +375,7 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.
controllerName := "bootstrap-controller"
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)

View File

@ -38,7 +38,7 @@ type EndpointReconciler interface {
ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error
// RemoveEndpoints removes this apiserver's lease.
RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error
// StopReonciling turns any later ReconcileEndpoints call into a noop.
// StopReconciling turns any later ReconcileEndpoints call into a noop.
StopReconciling()
}

View File

@ -103,8 +103,16 @@ func TestKubernetesService(t *testing.T) {
_, _, closeFn := framework.RunAMaster(config)
defer closeFn()
coreClient := clientset.NewForConfigOrDie(config.GenericConfig.LoopbackClientConfig)
if _, err := coreClient.Core().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}); err != nil {
t.Fatalf("Expected kubernetes service to exists, got: %v", err)
err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
if _, err := coreClient.Core().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}); err != nil && errors.IsNotFound(err) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
})
if err != nil {
t.Fatalf("Expected kubernetes service to exist, got: %v", err)
}
}