mirror of https://github.com/k3s-io/k3s
Report KCM as unhealthy if leader election is wedged.
Feedback from lavalamp and deads2k. Changed Check() logic to be central to LeaderElector. Further changes, especially cleaning up the test code.pull/58/head
parent
5a84dffc75
commit
9c43ee6d6e
|
@ -19,6 +19,7 @@ go_library(
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library",
|
||||||
|
|
|
@ -30,6 +30,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/util/uuid"
|
"k8s.io/apimachinery/pkg/util/uuid"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apiserver/pkg/server"
|
"k8s.io/apiserver/pkg/server"
|
||||||
|
"k8s.io/apiserver/pkg/server/healthz"
|
||||||
apiserverflag "k8s.io/apiserver/pkg/util/flag"
|
apiserverflag "k8s.io/apiserver/pkg/util/flag"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/tools/leaderelection"
|
"k8s.io/client-go/tools/leaderelection"
|
||||||
|
@ -131,16 +132,24 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error
|
||||||
klog.Errorf("unable to register configz: %c", err)
|
klog.Errorf("unable to register configz: %c", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Setup any healthz checks we will want to use.
|
||||||
|
var checks []healthz.HealthzChecker
|
||||||
|
var electionChecker *leaderelection.HealthzAdaptor
|
||||||
|
if c.ComponentConfig.Generic.LeaderElection.LeaderElect {
|
||||||
|
electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
|
||||||
|
checks = append(checks, electionChecker)
|
||||||
|
}
|
||||||
|
|
||||||
// Start the controller manager HTTP server
|
// Start the controller manager HTTP server
|
||||||
if c.SecureServing != nil {
|
if c.SecureServing != nil {
|
||||||
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging)
|
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
|
||||||
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
|
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
|
||||||
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
|
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if c.InsecureServing != nil {
|
if c.InsecureServing != nil {
|
||||||
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging)
|
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
|
||||||
insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}}
|
insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}}
|
||||||
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
|
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
|
||||||
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
|
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
|
||||||
|
@ -192,6 +201,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error
|
||||||
klog.Fatalf("leaderelection lost")
|
klog.Fatalf("leaderelection lost")
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
WatchDog: electionChecker,
|
||||||
|
Name: "cloud-controller-manager",
|
||||||
})
|
})
|
||||||
panic("unreachable")
|
panic("unreachable")
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,11 +17,10 @@ limitations under the License.
|
||||||
package app
|
package app
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"net/http"
|
"net/http"
|
||||||
goruntime "runtime"
|
goruntime "runtime"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
|
|
||||||
apiserverconfig "k8s.io/apiserver/pkg/apis/config"
|
apiserverconfig "k8s.io/apiserver/pkg/apis/config"
|
||||||
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
||||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
|
@ -53,9 +52,9 @@ func BuildHandlerChain(apiHandler http.Handler, authorizationInfo *apiserver.Aut
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBaseHandler takes in CompletedConfig and returns a handler.
|
// NewBaseHandler takes in CompletedConfig and returns a handler.
|
||||||
func NewBaseHandler(c *apiserverconfig.DebuggingConfiguration) *mux.PathRecorderMux {
|
func NewBaseHandler(c *apiserverconfig.DebuggingConfiguration, checks ...healthz.HealthzChecker) *mux.PathRecorderMux {
|
||||||
mux := mux.NewPathRecorderMux("controller-manager")
|
mux := mux.NewPathRecorderMux("controller-manager")
|
||||||
healthz.InstallHandler(mux)
|
healthz.InstallHandler(mux, checks...)
|
||||||
if c.EnableProfiling {
|
if c.EnableProfiling {
|
||||||
routes.Profiling{}.Install(mux)
|
routes.Profiling{}.Install(mux)
|
||||||
if c.EnableContentionProfiling {
|
if c.EnableContentionProfiling {
|
||||||
|
|
|
@ -113,6 +113,7 @@ go_library(
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
||||||
|
|
|
@ -38,6 +38,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/util/uuid"
|
"k8s.io/apimachinery/pkg/util/uuid"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apiserver/pkg/server"
|
"k8s.io/apiserver/pkg/server"
|
||||||
|
"k8s.io/apiserver/pkg/server/healthz"
|
||||||
"k8s.io/apiserver/pkg/server/mux"
|
"k8s.io/apiserver/pkg/server/mux"
|
||||||
apiserverflag "k8s.io/apiserver/pkg/util/flag"
|
apiserverflag "k8s.io/apiserver/pkg/util/flag"
|
||||||
cacheddiscovery "k8s.io/client-go/discovery/cached"
|
cacheddiscovery "k8s.io/client-go/discovery/cached"
|
||||||
|
@ -150,18 +151,26 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
|
||||||
klog.Errorf("unable to register configz: %c", err)
|
klog.Errorf("unable to register configz: %c", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Setup any healthz checks we will want to use.
|
||||||
|
var checks []healthz.HealthzChecker
|
||||||
|
var electionChecker *leaderelection.HealthzAdaptor
|
||||||
|
if c.ComponentConfig.Generic.LeaderElection.LeaderElect {
|
||||||
|
electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
|
||||||
|
checks = append(checks, electionChecker)
|
||||||
|
}
|
||||||
|
|
||||||
// Start the controller manager HTTP server
|
// Start the controller manager HTTP server
|
||||||
// unsecuredMux is the handler for these controller *after* authn/authz filters have been applied
|
// unsecuredMux is the handler for these controller *after* authn/authz filters have been applied
|
||||||
var unsecuredMux *mux.PathRecorderMux
|
var unsecuredMux *mux.PathRecorderMux
|
||||||
if c.SecureServing != nil {
|
if c.SecureServing != nil {
|
||||||
unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging)
|
unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
|
||||||
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
|
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
|
||||||
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
|
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if c.InsecureServing != nil {
|
if c.InsecureServing != nil {
|
||||||
unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging)
|
unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
|
||||||
insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}}
|
insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}}
|
||||||
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
|
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
|
||||||
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
|
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
|
||||||
|
@ -240,6 +249,8 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
|
||||||
klog.Fatalf("leaderelection lost")
|
klog.Fatalf("leaderelection lost")
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
WatchDog: electionChecker,
|
||||||
|
Name: "kube-controller-manager",
|
||||||
})
|
})
|
||||||
panic("unreachable")
|
panic("unreachable")
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,12 +8,16 @@ load(
|
||||||
|
|
||||||
go_library(
|
go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = ["leaderelection.go"],
|
srcs = [
|
||||||
|
"healthzadaptor.go",
|
||||||
|
"leaderelection.go",
|
||||||
|
],
|
||||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/leaderelection",
|
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/leaderelection",
|
||||||
importpath = "k8s.io/client-go/tools/leaderelection",
|
importpath = "k8s.io/client-go/tools/leaderelection",
|
||||||
deps = [
|
deps = [
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library",
|
||||||
|
@ -23,13 +27,17 @@ go_library(
|
||||||
|
|
||||||
go_test(
|
go_test(
|
||||||
name = "go_default_test",
|
name = "go_default_test",
|
||||||
srcs = ["leaderelection_test.go"],
|
srcs = [
|
||||||
|
"healthzadaptor_test.go",
|
||||||
|
"leaderelection_test.go",
|
||||||
|
],
|
||||||
embed = [":go_default_library"],
|
embed = [":go_default_library"],
|
||||||
deps = [
|
deps = [
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1/fake:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1/fake:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library",
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package leaderelection
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HealthzAdaptor associates the /healthz endpoint with the LeaderElection object.
|
||||||
|
// It helps deal with the /healthz endpoint being set up prior to the LeaderElection.
|
||||||
|
// This contains the code needed to act as an adaptor between the leader
|
||||||
|
// election code the health check code. It allows us to provide health
|
||||||
|
// status about the leader election. Most specifically about if the leader
|
||||||
|
// has failed to renew without exiting the process. In that case we should
|
||||||
|
// report not healthy and rely on the kubelet to take down the process.
|
||||||
|
type HealthzAdaptor struct {
|
||||||
|
pointerLock sync.Mutex
|
||||||
|
le *LeaderElector
|
||||||
|
timeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name returns the name of the health check we are implementing.
|
||||||
|
func (l *HealthzAdaptor) Name() string {
|
||||||
|
return "leaderElection"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check is called by the healthz endpoint handler.
|
||||||
|
// It fails (returns an error) if we own the lease but had not been able to renew it.
|
||||||
|
func (l *HealthzAdaptor) Check(req *http.Request) error {
|
||||||
|
l.pointerLock.Lock()
|
||||||
|
defer l.pointerLock.Unlock()
|
||||||
|
if l.le == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return l.le.Check(l.timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLeaderElection ties a leader election object to a HealthzAdaptor
|
||||||
|
func (l *HealthzAdaptor) SetLeaderElection(le *LeaderElector) {
|
||||||
|
l.pointerLock.Lock()
|
||||||
|
defer l.pointerLock.Unlock()
|
||||||
|
l.le = le
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLeaderHealthzAdaptor creates a basic healthz adaptor to monitor a leader election.
|
||||||
|
// timeout determines the time beyond the lease expiry to be allowed for timeout.
|
||||||
|
// checks within the timeout period after the lease expires will still return healthy.
|
||||||
|
func NewLeaderHealthzAdaptor(timeout time.Duration) *HealthzAdaptor {
|
||||||
|
result := &HealthzAdaptor{
|
||||||
|
timeout: timeout,
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
|
@ -0,0 +1,175 @@
|
||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package leaderelection
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
|
rl "k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fakeLock struct {
|
||||||
|
identity string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get is a dummy to allow us to have a fakeLock for testing.
|
||||||
|
func (fl *fakeLock) Get() (ler *rl.LeaderElectionRecord, err error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create is a dummy to allow us to have a fakeLock for testing.
|
||||||
|
func (fl *fakeLock) Create(ler rl.LeaderElectionRecord) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update is a dummy to allow us to have a fakeLock for testing.
|
||||||
|
func (fl *fakeLock) Update(ler rl.LeaderElectionRecord) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordEvent is a dummy to allow us to have a fakeLock for testing.
|
||||||
|
func (fl *fakeLock) RecordEvent(string) {}
|
||||||
|
|
||||||
|
// Identity is a dummy to allow us to have a fakeLock for testing.
|
||||||
|
func (fl *fakeLock) Identity() string {
|
||||||
|
return fl.identity
|
||||||
|
}
|
||||||
|
|
||||||
|
// Describe is a dummy to allow us to have a fakeLock for testing.
|
||||||
|
func (fl *fakeLock) Describe() string {
|
||||||
|
return "Dummy implementation of lock for testing"
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLeaderElectionHealthChecker tests that the healthcheck for leader election handles its edge cases.
|
||||||
|
func TestLeaderElectionHealthChecker(t *testing.T) {
|
||||||
|
current := time.Now()
|
||||||
|
req := &http.Request{}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
expected error
|
||||||
|
adaptorTimeout time.Duration
|
||||||
|
elector *LeaderElector
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
description: "call check before leader elector initialized",
|
||||||
|
expected: nil,
|
||||||
|
adaptorTimeout: time.Second * 20,
|
||||||
|
elector: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "call check when the the lease is far expired",
|
||||||
|
expected: fmt.Errorf("failed election to renew leadership on lease %s", "foo"),
|
||||||
|
adaptorTimeout: time.Second * 20,
|
||||||
|
elector: &LeaderElector{
|
||||||
|
config: LeaderElectionConfig{
|
||||||
|
Lock: &fakeLock{identity: "healthTest"},
|
||||||
|
LeaseDuration: time.Minute,
|
||||||
|
Name: "foo",
|
||||||
|
},
|
||||||
|
observedRecord: rl.LeaderElectionRecord{
|
||||||
|
HolderIdentity: "healthTest",
|
||||||
|
},
|
||||||
|
observedTime: current,
|
||||||
|
clock: clock.NewFakeClock(current.Add(time.Hour)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "call check when the the lease is far expired but held by another server",
|
||||||
|
expected: nil,
|
||||||
|
adaptorTimeout: time.Second * 20,
|
||||||
|
elector: &LeaderElector{
|
||||||
|
config: LeaderElectionConfig{
|
||||||
|
Lock: &fakeLock{identity: "healthTest"},
|
||||||
|
LeaseDuration: time.Minute,
|
||||||
|
Name: "foo",
|
||||||
|
},
|
||||||
|
observedRecord: rl.LeaderElectionRecord{
|
||||||
|
HolderIdentity: "otherServer",
|
||||||
|
},
|
||||||
|
observedTime: current,
|
||||||
|
clock: clock.NewFakeClock(current.Add(time.Hour)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "call check when the the lease is not expired",
|
||||||
|
expected: nil,
|
||||||
|
adaptorTimeout: time.Second * 20,
|
||||||
|
elector: &LeaderElector{
|
||||||
|
config: LeaderElectionConfig{
|
||||||
|
Lock: &fakeLock{identity: "healthTest"},
|
||||||
|
LeaseDuration: time.Minute,
|
||||||
|
Name: "foo",
|
||||||
|
},
|
||||||
|
observedRecord: rl.LeaderElectionRecord{
|
||||||
|
HolderIdentity: "healthTest",
|
||||||
|
},
|
||||||
|
observedTime: current,
|
||||||
|
clock: clock.NewFakeClock(current),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "call check when the the lease is expired but inside the timeout",
|
||||||
|
expected: nil,
|
||||||
|
adaptorTimeout: time.Second * 20,
|
||||||
|
elector: &LeaderElector{
|
||||||
|
config: LeaderElectionConfig{
|
||||||
|
Lock: &fakeLock{identity: "healthTest"},
|
||||||
|
LeaseDuration: time.Minute,
|
||||||
|
Name: "foo",
|
||||||
|
},
|
||||||
|
observedRecord: rl.LeaderElectionRecord{
|
||||||
|
HolderIdentity: "healthTest",
|
||||||
|
},
|
||||||
|
observedTime: current,
|
||||||
|
clock: clock.NewFakeClock(current.Add(time.Minute).Add(time.Second)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
adaptor := NewLeaderHealthzAdaptor(test.adaptorTimeout)
|
||||||
|
if adaptor.le != nil {
|
||||||
|
t.Errorf("[%s] leaderChecker started with a LeaderElector %v", test.description, adaptor.le)
|
||||||
|
}
|
||||||
|
if test.elector != nil {
|
||||||
|
test.elector.config.WatchDog = adaptor
|
||||||
|
adaptor.SetLeaderElection(test.elector)
|
||||||
|
if adaptor.le == nil {
|
||||||
|
t.Errorf("[%s] adaptor failed to set the LeaderElector", test.description)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := adaptor.Check(req)
|
||||||
|
if test.expected == nil {
|
||||||
|
if err == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
t.Errorf("[%s] called check, expected no error but received \"%v\"", test.description, err)
|
||||||
|
} else {
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("[%s] called check and failed to received the expected error \"%v\"", test.description, test.expected)
|
||||||
|
}
|
||||||
|
if err.Error() != test.expected.Error() {
|
||||||
|
t.Errorf("[%s] called check, expected %v, received %v", test.description, test.expected, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -56,6 +56,7 @@ import (
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
rl "k8s.io/client-go/tools/leaderelection/resourcelock"
|
rl "k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||||
|
@ -90,6 +91,7 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
|
||||||
}
|
}
|
||||||
return &LeaderElector{
|
return &LeaderElector{
|
||||||
config: lec,
|
config: lec,
|
||||||
|
clock: clock.RealClock{},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,6 +113,13 @@ type LeaderElectionConfig struct {
|
||||||
// Callbacks are callbacks that are triggered during certain lifecycle
|
// Callbacks are callbacks that are triggered during certain lifecycle
|
||||||
// events of the LeaderElector
|
// events of the LeaderElector
|
||||||
Callbacks LeaderCallbacks
|
Callbacks LeaderCallbacks
|
||||||
|
|
||||||
|
// WatchDog is the associated health checker
|
||||||
|
// WatchDog may be null if its not needed/configured.
|
||||||
|
WatchDog *HealthzAdaptor
|
||||||
|
|
||||||
|
// Name is the name of the resource lock for debugging
|
||||||
|
Name string
|
||||||
}
|
}
|
||||||
|
|
||||||
// LeaderCallbacks are callbacks that are triggered during certain
|
// LeaderCallbacks are callbacks that are triggered during certain
|
||||||
|
@ -139,6 +148,12 @@ type LeaderElector struct {
|
||||||
// value observedRecord.HolderIdentity if the transition has
|
// value observedRecord.HolderIdentity if the transition has
|
||||||
// not yet been reported.
|
// not yet been reported.
|
||||||
reportedLeader string
|
reportedLeader string
|
||||||
|
|
||||||
|
// clock is wrapper around time to allow for less flaky testing
|
||||||
|
clock clock.Clock
|
||||||
|
|
||||||
|
// name is the name of the resource lock for debugging
|
||||||
|
name string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the leader election loop
|
// Run starts the leader election loop
|
||||||
|
@ -163,6 +178,9 @@ func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
if lec.WatchDog != nil {
|
||||||
|
lec.WatchDog.SetLeaderElection(le)
|
||||||
|
}
|
||||||
le.Run(ctx)
|
le.Run(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,14 +275,14 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
le.observedRecord = leaderElectionRecord
|
le.observedRecord = leaderElectionRecord
|
||||||
le.observedTime = time.Now()
|
le.observedTime = le.clock.Now()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Record obtained, check the Identity & Time
|
// 2. Record obtained, check the Identity & Time
|
||||||
if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
|
if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
|
||||||
le.observedRecord = *oldLeaderElectionRecord
|
le.observedRecord = *oldLeaderElectionRecord
|
||||||
le.observedTime = time.Now()
|
le.observedTime = le.clock.Now()
|
||||||
}
|
}
|
||||||
if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
|
if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
|
||||||
!le.IsLeader() {
|
!le.IsLeader() {
|
||||||
|
@ -287,7 +305,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
le.observedRecord = leaderElectionRecord
|
le.observedRecord = leaderElectionRecord
|
||||||
le.observedTime = time.Now()
|
le.observedTime = le.clock.Now()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,3 +318,19 @@ func (le *LeaderElector) maybeReportTransition() {
|
||||||
go le.config.Callbacks.OnNewLeader(le.reportedLeader)
|
go le.config.Callbacks.OnNewLeader(le.reportedLeader)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check will determine if the current lease is expired by more than timeout.
|
||||||
|
func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
|
||||||
|
if !le.IsLeader() {
|
||||||
|
// Currently not concerned with the case that we are hot standby
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// If we are more than timeout seconds after the lease duration that is past the timeout
|
||||||
|
// on the lease renew. Time to start reporting ourselves as unhealthy. We should have
|
||||||
|
// died but conditions like deadlock can prevent this. (See #70819)
|
||||||
|
if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease {
|
||||||
|
return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
fakecorev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake"
|
fakecorev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake"
|
||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
rl "k8s.io/client-go/tools/leaderelection/resourcelock"
|
rl "k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||||
|
@ -257,6 +258,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
|
||||||
config: lec,
|
config: lec,
|
||||||
observedRecord: test.observedRecord,
|
observedRecord: test.observedRecord,
|
||||||
observedTime: test.observedTime,
|
observedTime: test.observedTime,
|
||||||
|
clock: clock.RealClock{},
|
||||||
}
|
}
|
||||||
|
|
||||||
if test.expectSuccess != le.tryAcquireOrRenew() {
|
if test.expectSuccess != le.tryAcquireOrRenew() {
|
||||||
|
|
Loading…
Reference in New Issue