mirror of https://github.com/k3s-io/k3s
Merge pull request #74668 from sttts/sttts-kube-apiserver-endpoints-when-ready
kube-apiserver: don't create endpoints before being readypull/564/head
commit
b1d4d40679
|
@ -115,6 +115,7 @@ go_library(
|
|||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
|
|
|
@ -19,6 +19,7 @@ package master
|
|||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
|
@ -31,6 +32,7 @@ import (
|
|||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/master/reconcilers"
|
||||
|
@ -51,6 +53,7 @@ type Controller struct {
|
|||
ServiceClient corev1client.ServicesGetter
|
||||
NamespaceClient corev1client.NamespacesGetter
|
||||
EventClient corev1client.EventsGetter
|
||||
healthClient rest.Interface
|
||||
|
||||
ServiceClusterIPRegistry rangeallocation.RangeRegistry
|
||||
ServiceClusterIPInterval time.Duration
|
||||
|
@ -80,7 +83,7 @@ type Controller struct {
|
|||
}
|
||||
|
||||
// NewBootstrapController returns a controller for watching the core capabilities of the master
|
||||
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter) *Controller {
|
||||
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter, healthClient rest.Interface) *Controller {
|
||||
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
|
||||
if err != nil {
|
||||
klog.Fatalf("failed to get listener address: %v", err)
|
||||
|
@ -95,6 +98,7 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega
|
|||
ServiceClient: serviceClient,
|
||||
NamespaceClient: nsClient,
|
||||
EventClient: eventClient,
|
||||
healthClient: healthClient,
|
||||
|
||||
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
|
||||
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
|
||||
|
@ -138,6 +142,12 @@ func (c *Controller) Start() {
|
|||
return
|
||||
}
|
||||
|
||||
// Reconcile during first run removing itself until server is ready.
|
||||
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
|
||||
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
|
||||
klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
|
||||
}
|
||||
|
||||
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry)
|
||||
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
|
||||
|
||||
|
@ -150,10 +160,6 @@ func (c *Controller) Start() {
|
|||
// If we fail to repair node ports apiserver is useless. We should restart and retry.
|
||||
klog.Fatalf("Unable to perform initial service nodePort check: %v", err)
|
||||
}
|
||||
// Service definition is reconciled during first run to correct port and type per expectations.
|
||||
if err := c.UpdateKubernetesService(true); err != nil {
|
||||
klog.Errorf("Unable to perform initial Kubernetes service initialization: %v", err)
|
||||
}
|
||||
|
||||
c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
|
||||
c.runner.Start()
|
||||
|
@ -168,7 +174,8 @@ func (c *Controller) Stop() {
|
|||
go func() {
|
||||
defer close(finishedReconciling)
|
||||
klog.Infof("Shutting down kubernetes service endpoint reconciler")
|
||||
if err := c.EndpointReconciler.StopReconciling(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
|
||||
c.EndpointReconciler.StopReconciling()
|
||||
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
|
||||
klog.Error(err)
|
||||
}
|
||||
}()
|
||||
|
@ -178,7 +185,7 @@ func (c *Controller) Stop() {
|
|||
// done
|
||||
case <-time.After(2 * c.EndpointInterval):
|
||||
// don't block server shutdown forever if we can't reach etcd to remove ourselves
|
||||
klog.Warning("StopReconciling() timed out")
|
||||
klog.Warning("RemoveEndpoints() timed out")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -196,7 +203,14 @@ func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
|
|||
|
||||
// RunKubernetesService periodically updates the kubernetes service
|
||||
func (c *Controller) RunKubernetesService(ch chan struct{}) {
|
||||
wait.Until(func() {
|
||||
// wait until process is ready
|
||||
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
|
||||
var code int
|
||||
c.healthClient.Get().AbsPath("/healthz").Do().StatusCode(&code)
|
||||
return code == http.StatusOK, nil
|
||||
}, ch)
|
||||
|
||||
wait.NonSlidingUntil(func() {
|
||||
// Service definition is not reconciled after first
|
||||
// run, ports and type will be corrected only during
|
||||
// start.
|
||||
|
|
|
@ -375,7 +375,7 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.
|
|||
|
||||
controllerName := "bootstrap-controller"
|
||||
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient)
|
||||
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
|
||||
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
|
||||
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
|
||||
|
||||
|
|
|
@ -283,14 +283,16 @@ func checkEndpointSubsetFormatWithLease(e *corev1.Endpoints, expectedIPs []strin
|
|||
return true, ipsCorrect, portsCorrect
|
||||
}
|
||||
|
||||
func (r *leaseEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
|
||||
r.reconcilingLock.Lock()
|
||||
defer r.reconcilingLock.Unlock()
|
||||
r.stopReconcilingCalled = true
|
||||
|
||||
func (r *leaseEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
|
||||
if err := r.masterLeases.RemoveLease(ip.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return r.doReconcile(serviceName, endpointPorts, true)
|
||||
}
|
||||
|
||||
func (r *leaseEndpointReconciler) StopReconciling() {
|
||||
r.reconcilingLock.Lock()
|
||||
defer r.reconcilingLock.Unlock()
|
||||
r.stopReconcilingCalled = true
|
||||
}
|
||||
|
|
|
@ -547,7 +547,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestLeaseStopReconciling(t *testing.T) {
|
||||
func TestLeaseRemoveEndpoints(t *testing.T) {
|
||||
ns := corev1.NamespaceDefault
|
||||
om := func(name string) metav1.ObjectMeta {
|
||||
return metav1.ObjectMeta{Namespace: ns, Name: name}
|
||||
|
@ -627,7 +627,7 @@ func TestLeaseStopReconciling(t *testing.T) {
|
|||
}
|
||||
}
|
||||
r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases)
|
||||
err := r.StopReconciling(test.serviceName, net.ParseIP(test.ip), test.endpointPorts)
|
||||
err := r.RemoveEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts)
|
||||
if err != nil {
|
||||
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
||||
}
|
||||
|
|
|
@ -137,10 +137,9 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
|
|||
return err
|
||||
}
|
||||
|
||||
func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
|
||||
func (r *masterCountEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
|
||||
r.reconcilingLock.Lock()
|
||||
defer r.reconcilingLock.Unlock()
|
||||
r.stopReconcilingCalled = true
|
||||
|
||||
e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
|
@ -167,6 +166,12 @@ func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip n
|
|||
return err
|
||||
}
|
||||
|
||||
func (r *masterCountEndpointReconciler) StopReconciling() {
|
||||
r.reconcilingLock.Lock()
|
||||
defer r.reconcilingLock.Unlock()
|
||||
r.stopReconcilingCalled = true
|
||||
}
|
||||
|
||||
// Determine if the endpoint is in the format ReconcileEndpoints expects.
|
||||
//
|
||||
// Return values:
|
||||
|
|
|
@ -18,8 +18,9 @@ limitations under the License.
|
|||
package reconcilers
|
||||
|
||||
import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"net"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// NoneEndpointReconciler allows for the endpoint reconciler to be disabled
|
||||
|
@ -36,7 +37,10 @@ func (r *noneEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.I
|
|||
return nil
|
||||
}
|
||||
|
||||
// StopReconciling noop reconcile
|
||||
func (r *noneEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
|
||||
// RemoveEndpoints noop reconcile
|
||||
func (r *noneEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *noneEndpointReconciler) StopReconciling() {
|
||||
}
|
||||
|
|
|
@ -18,8 +18,9 @@ limitations under the License.
|
|||
package reconcilers
|
||||
|
||||
import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"net"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// EndpointReconciler knows how to reconcile the endpoints for the apiserver service.
|
||||
|
@ -35,7 +36,10 @@ type EndpointReconciler interface {
|
|||
// endpoints for their {rw, ro} services.
|
||||
// * ReconcileEndpoints is called periodically from all apiservers.
|
||||
ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error
|
||||
StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error
|
||||
// RemoveEndpoints removes this apiserver's lease.
|
||||
RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error
|
||||
// StopReconciling turns any later ReconcileEndpoints call into a noop.
|
||||
StopReconciling()
|
||||
}
|
||||
|
||||
// Type the reconciler type
|
||||
|
|
|
@ -103,8 +103,16 @@ func TestKubernetesService(t *testing.T) {
|
|||
_, _, closeFn := framework.RunAMaster(config)
|
||||
defer closeFn()
|
||||
coreClient := clientset.NewForConfigOrDie(config.GenericConfig.LoopbackClientConfig)
|
||||
if _, err := coreClient.Core().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}); err != nil {
|
||||
t.Fatalf("Expected kubernetes service to exists, got: %v", err)
|
||||
err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
if _, err := coreClient.Core().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}); err != nil && errors.IsNotFound(err) {
|
||||
return false, nil
|
||||
} else if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected kubernetes service to exist, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue