From db379de778e34d80823c3c2da05212e651a4be54 Mon Sep 17 00:00:00 2001 From: Zihong Zheng Date: Thu, 27 Jul 2017 18:30:21 -0700 Subject: [PATCH] Emit event and retry when fail to start healthz server on kube-proxy. --- cmd/kube-proxy/app/server.go | 16 ++++----- pkg/proxy/healthcheck/BUILD | 1 + pkg/proxy/healthcheck/healthcheck.go | 42 +++++++++++++++-------- pkg/proxy/healthcheck/healthcheck_test.go | 2 +- 4 files changed, 38 insertions(+), 23 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index d5766545c9..7bcba89ec2 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -462,10 +462,17 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "kube-proxy", Host: hostname}) + nodeRef := &v1.ObjectReference{ + Kind: "Node", + Name: hostname, + UID: types.UID(hostname), + Namespace: "", + } + var healthzServer *healthcheck.HealthzServer var healthzUpdater healthcheck.HealthzUpdater if len(config.HealthzBindAddress) > 0 { - healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration) + healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef) healthzUpdater = healthzServer } @@ -572,13 +579,6 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx iptInterface.AddReloadFunc(proxier.Sync) } - nodeRef := &v1.ObjectReference{ - Kind: "Node", - Name: hostname, - UID: types.UID(hostname), - Namespace: "", - } - return &ProxyServer{ Client: client, EventClient: eventClient, diff --git a/pkg/proxy/healthcheck/BUILD b/pkg/proxy/healthcheck/BUILD index 85c3969a88..e6442fd57d 100644 --- a/pkg/proxy/healthcheck/BUILD +++ b/pkg/proxy/healthcheck/BUILD @@ -22,6 +22,7 @@ go_library( "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/pkg/proxy/healthcheck/healthcheck.go b/pkg/proxy/healthcheck/healthcheck.go index 8a503bc917..7ea31d0a4e 100644 --- a/pkg/proxy/healthcheck/healthcheck.go +++ b/pkg/proxy/healthcheck/healthcheck.go @@ -31,10 +31,13 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api" ) +var nodeHealthzRetryInterval = 60 * time.Second + // Server serves HTTP endpoints for each service name, with results // based on the endpoints. If there are 0 endpoints for a service, it returns a // 503 "Service Unavailable" error (telling LBs not to use this node). If there @@ -161,7 +164,7 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err Namespace: nsn.Namespace, Name: nsn.Name, UID: types.UID(nsn.String()), - }, api.EventTypeWarning, "FailedToStartHealthcheck", msg) + }, api.EventTypeWarning, "FailedToStartServiceHealthcheck", msg) } glog.Error(msg) continue @@ -259,16 +262,18 @@ type HealthzServer struct { addr string port int32 healthTimeout time.Duration + recorder record.EventRecorder + nodeRef *v1.ObjectReference lastUpdated atomic.Value } // NewDefaultHealthzServer returns a default healthz http server. -func NewDefaultHealthzServer(addr string, healthTimeout time.Duration) *HealthzServer { - return newHealthzServer(nil, nil, nil, addr, healthTimeout) +func NewDefaultHealthzServer(addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer { + return newHealthzServer(nil, nil, nil, addr, healthTimeout, recorder, nodeRef) } -func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *HealthzServer { +func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer { if listener == nil { listener = stdNetListener{} } @@ -284,6 +289,8 @@ func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock: c, addr: addr, healthTimeout: healthTimeout, + recorder: recorder, + nodeRef: nodeRef, } } @@ -297,19 +304,26 @@ func (hs *HealthzServer) Run() { serveMux := http.NewServeMux() serveMux.Handle("/healthz", healthzHandler{hs: hs}) server := hs.httpFactory.New(hs.addr, serveMux) - listener, err := hs.listener.Listen(hs.addr) - if err != nil { - glog.Errorf("Failed to start healthz on %s: %v", hs.addr, err) - return - } - go func() { + + go wait.Until(func() { glog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr) - if err := server.Serve(listener); err != nil { - glog.Errorf("Healhz closed: %v", err) + + listener, err := hs.listener.Listen(hs.addr) + if err != nil { + msg := fmt.Sprintf("Failed to start node healthz on %s: %v", hs.addr, err) + if hs.recorder != nil { + hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartNodeHealthcheck", msg) + } + glog.Error(msg) return } - glog.Errorf("Unexpected healhz closed.") - }() + + if err := server.Serve(listener); err != nil { + glog.Errorf("Healthz closed with error: %v", err) + return + } + glog.Errorf("Unexpected healthz closed.") + }, nodeHealthzRetryInterval, wait.NeverStop) } type healthzHandler struct { diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index b34f8219ed..0a7da2f276 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -368,7 +368,7 @@ func TestHealthzServer(t *testing.T) { httpFactory := newFakeHTTPServerFactory() fakeClock := clock.NewFakeClock(time.Now()) - hs := newHealthzServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second) + hs := newHealthzServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil) server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs}) // Should return 200 "OK" by default.