diff --git a/cmd/cloud-controller-manager/app/BUILD b/cmd/cloud-controller-manager/app/BUILD index 4e15aeaf93..78e517a9b3 100644 --- a/cmd/cloud-controller-manager/app/BUILD +++ b/cmd/cloud-controller-manager/app/BUILD @@ -19,6 +19,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flag:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library", diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go index 4dc73c6616..99cf674192 100644 --- a/cmd/cloud-controller-manager/app/controllermanager.go +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/server/healthz" apiserverflag "k8s.io/apiserver/pkg/util/flag" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/leaderelection" @@ -131,16 +132,24 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error klog.Errorf("unable to register configz: %c", err) } + // Setup any healthz checks we will want to use. + var checks []healthz.HealthzChecker + var electionChecker *leaderelection.HealthzAdaptor + if c.ComponentConfig.Generic.LeaderElection.LeaderElect { + electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20) + checks = append(checks, electionChecker) + } + // Start the controller manager HTTP server if c.SecureServing != nil { - unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging) + unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { return err } } if c.InsecureServing != nil { - unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging) + unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}} handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn) if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil { @@ -192,6 +201,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error klog.Fatalf("leaderelection lost") }, }, + WatchDog: electionChecker, + Name: "cloud-controller-manager", }) panic("unreachable") } diff --git a/cmd/controller-manager/app/serve.go b/cmd/controller-manager/app/serve.go index 9c958128b4..37fd22199d 100644 --- a/cmd/controller-manager/app/serve.go +++ b/cmd/controller-manager/app/serve.go @@ -17,11 +17,10 @@ limitations under the License. package app import ( + "github.com/prometheus/client_golang/prometheus" "net/http" goruntime "runtime" - "github.com/prometheus/client_golang/prometheus" - apiserverconfig "k8s.io/apiserver/pkg/apis/config" genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" apirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -53,9 +52,9 @@ func BuildHandlerChain(apiHandler http.Handler, authorizationInfo *apiserver.Aut } // NewBaseHandler takes in CompletedConfig and returns a handler. -func NewBaseHandler(c *apiserverconfig.DebuggingConfiguration) *mux.PathRecorderMux { +func NewBaseHandler(c *apiserverconfig.DebuggingConfiguration, checks ...healthz.HealthzChecker) *mux.PathRecorderMux { mux := mux.NewPathRecorderMux("controller-manager") - healthz.InstallHandler(mux) + healthz.InstallHandler(mux, checks...) if c.EnableProfiling { routes.Profiling{}.Install(mux) if c.EnableContentionProfiling { diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 488f63859a..2fa0f43dc7 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -113,6 +113,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flag:go_default_library", diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 0710e4c3b1..5e68609219 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/mux" apiserverflag "k8s.io/apiserver/pkg/util/flag" cacheddiscovery "k8s.io/client-go/discovery/cached" @@ -150,18 +151,26 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { klog.Errorf("unable to register configz: %c", err) } + // Setup any healthz checks we will want to use. + var checks []healthz.HealthzChecker + var electionChecker *leaderelection.HealthzAdaptor + if c.ComponentConfig.Generic.LeaderElection.LeaderElect { + electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20) + checks = append(checks, electionChecker) + } + // Start the controller manager HTTP server // unsecuredMux is the handler for these controller *after* authn/authz filters have been applied var unsecuredMux *mux.PathRecorderMux if c.SecureServing != nil { - unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging) + unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { return err } } if c.InsecureServing != nil { - unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging) + unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}} handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn) if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil { @@ -240,6 +249,8 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { klog.Fatalf("leaderelection lost") }, }, + WatchDog: electionChecker, + Name: "kube-controller-manager", }) panic("unreachable") } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/BUILD index 6716f87e25..bed0f89fee 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/BUILD +++ b/staging/src/k8s.io/client-go/tools/leaderelection/BUILD @@ -8,12 +8,16 @@ load( go_library( name = "go_default_library", - srcs = ["leaderelection.go"], + srcs = [ + "healthzadaptor.go", + "leaderelection.go", + ], importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/leaderelection", importpath = "k8s.io/client-go/tools/leaderelection", deps = [ "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library", @@ -23,13 +27,17 @@ go_library( go_test( name = "go_default_test", - srcs = ["leaderelection_test.go"], + srcs = [ + "healthzadaptor_test.go", + "leaderelection_test.go", + ], embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1/fake:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library", diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/healthzadaptor.go b/staging/src/k8s.io/client-go/tools/leaderelection/healthzadaptor.go new file mode 100644 index 0000000000..b935372919 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/leaderelection/healthzadaptor.go @@ -0,0 +1,69 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "net/http" + "sync" + "time" +) + +// HealthzAdaptor associates the /healthz endpoint with the LeaderElection object. +// It helps deal with the /healthz endpoint being set up prior to the LeaderElection. +// This contains the code needed to act as an adaptor between the leader +// election code the health check code. It allows us to provide health +// status about the leader election. Most specifically about if the leader +// has failed to renew without exiting the process. In that case we should +// report not healthy and rely on the kubelet to take down the process. +type HealthzAdaptor struct { + pointerLock sync.Mutex + le *LeaderElector + timeout time.Duration +} + +// Name returns the name of the health check we are implementing. +func (l *HealthzAdaptor) Name() string { + return "leaderElection" +} + +// Check is called by the healthz endpoint handler. +// It fails (returns an error) if we own the lease but had not been able to renew it. +func (l *HealthzAdaptor) Check(req *http.Request) error { + l.pointerLock.Lock() + defer l.pointerLock.Unlock() + if l.le == nil { + return nil + } + return l.le.Check(l.timeout) +} + +// SetLeaderElection ties a leader election object to a HealthzAdaptor +func (l *HealthzAdaptor) SetLeaderElection(le *LeaderElector) { + l.pointerLock.Lock() + defer l.pointerLock.Unlock() + l.le = le +} + +// NewLeaderHealthzAdaptor creates a basic healthz adaptor to monitor a leader election. +// timeout determines the time beyond the lease expiry to be allowed for timeout. +// checks within the timeout period after the lease expires will still return healthy. +func NewLeaderHealthzAdaptor(timeout time.Duration) *HealthzAdaptor { + result := &HealthzAdaptor{ + timeout: timeout, + } + return result +} diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/healthzadaptor_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/healthzadaptor_test.go new file mode 100644 index 0000000000..746d49130c --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/leaderelection/healthzadaptor_test.go @@ -0,0 +1,175 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "fmt" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/clock" + rl "k8s.io/client-go/tools/leaderelection/resourcelock" + "net/http" +) + +type fakeLock struct { + identity string +} + +// Get is a dummy to allow us to have a fakeLock for testing. +func (fl *fakeLock) Get() (ler *rl.LeaderElectionRecord, err error) { + return nil, nil +} + +// Create is a dummy to allow us to have a fakeLock for testing. +func (fl *fakeLock) Create(ler rl.LeaderElectionRecord) error { + return nil +} + +// Update is a dummy to allow us to have a fakeLock for testing. +func (fl *fakeLock) Update(ler rl.LeaderElectionRecord) error { + return nil +} + +// RecordEvent is a dummy to allow us to have a fakeLock for testing. +func (fl *fakeLock) RecordEvent(string) {} + +// Identity is a dummy to allow us to have a fakeLock for testing. +func (fl *fakeLock) Identity() string { + return fl.identity +} + +// Describe is a dummy to allow us to have a fakeLock for testing. +func (fl *fakeLock) Describe() string { + return "Dummy implementation of lock for testing" +} + +// TestLeaderElectionHealthChecker tests that the healthcheck for leader election handles its edge cases. +func TestLeaderElectionHealthChecker(t *testing.T) { + current := time.Now() + req := &http.Request{} + + tests := []struct { + description string + expected error + adaptorTimeout time.Duration + elector *LeaderElector + }{ + { + description: "call check before leader elector initialized", + expected: nil, + adaptorTimeout: time.Second * 20, + elector: nil, + }, + { + description: "call check when the the lease is far expired", + expected: fmt.Errorf("failed election to renew leadership on lease %s", "foo"), + adaptorTimeout: time.Second * 20, + elector: &LeaderElector{ + config: LeaderElectionConfig{ + Lock: &fakeLock{identity: "healthTest"}, + LeaseDuration: time.Minute, + Name: "foo", + }, + observedRecord: rl.LeaderElectionRecord{ + HolderIdentity: "healthTest", + }, + observedTime: current, + clock: clock.NewFakeClock(current.Add(time.Hour)), + }, + }, + { + description: "call check when the the lease is far expired but held by another server", + expected: nil, + adaptorTimeout: time.Second * 20, + elector: &LeaderElector{ + config: LeaderElectionConfig{ + Lock: &fakeLock{identity: "healthTest"}, + LeaseDuration: time.Minute, + Name: "foo", + }, + observedRecord: rl.LeaderElectionRecord{ + HolderIdentity: "otherServer", + }, + observedTime: current, + clock: clock.NewFakeClock(current.Add(time.Hour)), + }, + }, + { + description: "call check when the the lease is not expired", + expected: nil, + adaptorTimeout: time.Second * 20, + elector: &LeaderElector{ + config: LeaderElectionConfig{ + Lock: &fakeLock{identity: "healthTest"}, + LeaseDuration: time.Minute, + Name: "foo", + }, + observedRecord: rl.LeaderElectionRecord{ + HolderIdentity: "healthTest", + }, + observedTime: current, + clock: clock.NewFakeClock(current), + }, + }, + { + description: "call check when the the lease is expired but inside the timeout", + expected: nil, + adaptorTimeout: time.Second * 20, + elector: &LeaderElector{ + config: LeaderElectionConfig{ + Lock: &fakeLock{identity: "healthTest"}, + LeaseDuration: time.Minute, + Name: "foo", + }, + observedRecord: rl.LeaderElectionRecord{ + HolderIdentity: "healthTest", + }, + observedTime: current, + clock: clock.NewFakeClock(current.Add(time.Minute).Add(time.Second)), + }, + }, + } + + for _, test := range tests { + adaptor := NewLeaderHealthzAdaptor(test.adaptorTimeout) + if adaptor.le != nil { + t.Errorf("[%s] leaderChecker started with a LeaderElector %v", test.description, adaptor.le) + } + if test.elector != nil { + test.elector.config.WatchDog = adaptor + adaptor.SetLeaderElection(test.elector) + if adaptor.le == nil { + t.Errorf("[%s] adaptor failed to set the LeaderElector", test.description) + } + } + err := adaptor.Check(req) + if test.expected == nil { + if err == nil { + continue + } + t.Errorf("[%s] called check, expected no error but received \"%v\"", test.description, err) + } else { + if err == nil { + t.Errorf("[%s] called check and failed to received the expected error \"%v\"", test.description, test.expected) + } + if err.Error() != test.expected.Error() { + t.Errorf("[%s] called check, expected %v, received %v", test.description, test.expected, err) + } + } + } +} diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go index 1bd6167b6e..2096a5996b 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -56,6 +56,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" rl "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -90,6 +91,7 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) { } return &LeaderElector{ config: lec, + clock: clock.RealClock{}, }, nil } @@ -111,6 +113,13 @@ type LeaderElectionConfig struct { // Callbacks are callbacks that are triggered during certain lifecycle // events of the LeaderElector Callbacks LeaderCallbacks + + // WatchDog is the associated health checker + // WatchDog may be null if its not needed/configured. + WatchDog *HealthzAdaptor + + // Name is the name of the resource lock for debugging + Name string } // LeaderCallbacks are callbacks that are triggered during certain @@ -139,6 +148,12 @@ type LeaderElector struct { // value observedRecord.HolderIdentity if the transition has // not yet been reported. reportedLeader string + + // clock is wrapper around time to allow for less flaky testing + clock clock.Clock + + // name is the name of the resource lock for debugging + name string } // Run starts the leader election loop @@ -163,6 +178,9 @@ func RunOrDie(ctx context.Context, lec LeaderElectionConfig) { if err != nil { panic(err) } + if lec.WatchDog != nil { + lec.WatchDog.SetLeaderElection(le) + } le.Run(ctx) } @@ -257,14 +275,14 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { return false } le.observedRecord = leaderElectionRecord - le.observedTime = time.Now() + le.observedTime = le.clock.Now() return true } // 2. Record obtained, check the Identity & Time if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) { le.observedRecord = *oldLeaderElectionRecord - le.observedTime = time.Now() + le.observedTime = le.clock.Now() } if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { @@ -287,7 +305,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { return false } le.observedRecord = leaderElectionRecord - le.observedTime = time.Now() + le.observedTime = le.clock.Now() return true } @@ -300,3 +318,19 @@ func (le *LeaderElector) maybeReportTransition() { go le.config.Callbacks.OnNewLeader(le.reportedLeader) } } + +// Check will determine if the current lease is expired by more than timeout. +func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error { + if !le.IsLeader() { + // Currently not concerned with the case that we are hot standby + return nil + } + // If we are more than timeout seconds after the lease duration that is past the timeout + // on the lease renew. Time to start reporting ourselves as unhealthy. We should have + // died but conditions like deadlock can prevent this. (See #70819) + if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease { + return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name) + } + + return nil +} diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go index e99cf57afb..842aebdab2 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" fakecorev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake" core "k8s.io/client-go/testing" rl "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -257,6 +258,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { config: lec, observedRecord: test.observedRecord, observedTime: test.observedTime, + clock: clock.RealClock{}, } if test.expectSuccess != le.tryAcquireOrRenew() {