Merge pull request #71490 from smarterclayton/step_down

leaderelection: Allow leader elected code to step down on a context cancel
pull/564/head
Kubernetes Prow Robot 2019-01-11 21:42:56 -08:00 committed by GitHub
commit 5e14bf6487
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 580 additions and 105 deletions

View File

@ -25,7 +25,7 @@ import (
"sync"
"time"
"github.com/googleapis/gnostic/OpenAPIv2"
openapi_v2 "github.com/googleapis/gnostic/OpenAPIv2"
"k8s.io/klog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -266,13 +266,10 @@ func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCache
if len(httpCacheDir) > 0 {
// update the given restconfig with a custom roundtripper that
// understands how to handle cache responses.
wt := config.WrapTransport
config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
if wt != nil {
rt = wt(rt)
}
config = restclient.CopyConfig(config)
config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return newCacheRoundTripper(httpCacheDir, rt)
}
})
}
discoveryClient, err := NewDiscoveryClientForConfig(config)

View File

@ -32,7 +32,7 @@ import (
"time"
"golang.org/x/crypto/ssh/terminal"
"k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
@ -172,13 +172,9 @@ type credentials struct {
// UpdateTransportConfig updates the transport.Config to use credentials
// returned by the plugin.
func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
wt := c.WrapTransport
c.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
if wt != nil {
rt = wt(rt)
}
c.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return &roundTripper{a, rt}
}
})
if c.TLS.GetCert != nil {
return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")

View File

@ -36,6 +36,7 @@ go_test(
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/rest/watch:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd/api:go_default_library",
"//staging/src/k8s.io/client-go/transport:go_default_library",
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//vendor/github.com/google/gofuzz:go_default_library",

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/pkg/version"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/transport"
certutil "k8s.io/client-go/util/cert"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog"
@ -95,13 +96,16 @@ type Config struct {
// Transport may be used for custom HTTP behavior. This attribute may not
// be specified with the TLS client certificate options. Use WrapTransport
// for most client level operations.
// to provide additional per-server middleware behavior.
Transport http.RoundTripper
// WrapTransport will be invoked for custom HTTP behavior after the underlying
// transport is initialized (either the transport created from TLSClientConfig,
// Transport, or http.DefaultTransport). The config may layer other RoundTrippers
// on top of the returned RoundTripper.
WrapTransport func(rt http.RoundTripper) http.RoundTripper
//
// A future release will change this field to an array. Use config.Wrap()
// instead of setting this value directly.
WrapTransport transport.WrapperFunc
// QPS indicates the maximum QPS to the master from this client.
// If it's zero, the created RESTClient will use DefaultQPS: 5

View File

@ -27,12 +27,13 @@ import (
"strings"
"testing"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/client-go/kubernetes/scheme"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/flowcontrol"
fuzz "github.com/google/gofuzz"
@ -236,6 +237,9 @@ func TestAnonymousConfig(t *testing.T) {
func(fn *func(http.RoundTripper) http.RoundTripper, f fuzz.Continue) {
*fn = fakeWrapperFunc
},
func(fn *transport.WrapperFunc, f fuzz.Continue) {
*fn = fakeWrapperFunc
},
func(r *runtime.NegotiatedSerializer, f fuzz.Continue) {
serializer := &fakeNegotiatedSerializer{}
f.Fuzz(serializer)
@ -316,6 +320,9 @@ func TestCopyConfig(t *testing.T) {
func(fn *func(http.RoundTripper) http.RoundTripper, f fuzz.Continue) {
*fn = fakeWrapperFunc
},
func(fn *transport.WrapperFunc, f fuzz.Continue) {
*fn = fakeWrapperFunc
},
func(r *runtime.NegotiatedSerializer, f fuzz.Continue) {
serializer := &fakeNegotiatedSerializer{}
f.Fuzz(serializer)

View File

@ -103,14 +103,15 @@ func (c *Config) TransportConfig() (*transport.Config, error) {
if err != nil {
return nil, err
}
wt := conf.WrapTransport
if wt != nil {
conf.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
return provider.WrapTransport(wt(rt))
}
} else {
conf.WrapTransport = provider.WrapTransport
}
conf.Wrap(provider.WrapTransport)
}
return conf, nil
}
// Wrap adds a transport middleware function that will give the caller
// an opportunity to wrap the underlying http.RoundTripper prior to the
// first API call being made. The provided function is invoked after any
// existing transport wrappers are invoked.
func (c *Config) Wrap(fn transport.WrapperFunc) {
c.WrapTransport = transport.Wrappers(c.WrapTransport, fn)
}

View File

@ -57,6 +57,7 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//staging/src/k8s.io/client-go/tools/leaderelection/example:all-srcs",
"//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:all-srcs",
],
tags = ["automanaged"],

View File

@ -0,0 +1,39 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
go_library(
name = "go_default_library",
srcs = ["main.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/leaderelection/example",
importpath = "k8s.io/client-go/tools/leaderelection/example",
visibility = ["//visibility:private"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library",
"//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library",
"//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library",
"//staging/src/k8s.io/client-go/transport:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_binary(
name = "example",
embed = [":go_default_library"],
visibility = ["//visibility:public"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,135 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/transport"
"k8s.io/klog"
)
// main demonstrates a leader elected process that will step down if interrupted.
func main() {
klog.InitFlags(nil)
flag.Parse()
args := flag.Args()
if len(args) != 3 {
log.Fatalf("requires three arguments: ID NAMESPACE CONFIG_MAP_NAME (%d)", len(args))
}
// leader election uses the Kubernetes API by writing to a ConfigMap or Endpoints
// object. Conflicting writes are detected and each client handles those actions
// independently.
var config *rest.Config
var err error
if kubeconfig := os.Getenv("KUBECONFIG"); len(kubeconfig) > 0 {
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
} else {
config, err = rest.InClusterConfig()
}
if err != nil {
log.Fatalf("failed to create client: %v", err)
}
// we use the ConfigMap lock type since edits to ConfigMaps are less common
// and fewer objects in the cluster watch "all ConfigMaps" (unlike the older
// Endpoints lock type, where quite a few system agents like the kube-proxy
// and ingress controllers must watch endpoints).
id := args[0]
lock := &resourcelock.ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{
Namespace: args[1],
Name: args[2],
},
Client: kubernetes.NewForConfigOrDie(config).CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
// use a Go context so we can tell the leaderelection code when we
// want to step down
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// use a client that will stop allowing new requests once the context ends
config.Wrap(transport.ContextCanceller(ctx, fmt.Errorf("the leader is shutting down")))
exampleClient := kubernetes.NewForConfigOrDie(config).CoreV1()
// listen for interrupts or the Linux SIGTERM signal and cancel
// our context, which the leader election code will observe and
// step down
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
log.Printf("Received termination, signaling shutdown")
cancel()
}()
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// we're notified when we start - this is where you would
// usually put your code
log.Printf("%s: leading", id)
},
OnStoppedLeading: func() {
// we can do cleanup here, or after the RunOrDie method
// returns
log.Printf("%s: lost", id)
},
},
})
// because the context is closed, the client should report errors
_, err = exampleClient.ConfigMaps(args[1]).Get(args[2], metav1.GetOptions{})
if err == nil || !strings.Contains(err.Error(), "the leader is shutting down") {
log.Fatalf("%s: expected to get an error when trying to make a client call: %v", id, err)
}
// we no longer hold the lease, so perform any cleanup and then
// exit
log.Printf("%s: done", id)
}

View File

@ -121,6 +121,13 @@ type LeaderElectionConfig struct {
// WatchDog may be null if its not needed/configured.
WatchDog *HealthzAdaptor
// ReleaseOnCancel should be set true if the lock should be released
// when the run context is cancelled. If you set this to true, you must
// ensure all code guarded by this lease has successfully completed
// prior to cancelling the context, or you may have two processes
// simultaneously acting on the critical path.
ReleaseOnCancel bool
// Name is the name of the resource lock for debugging
Name string
}
@ -256,6 +263,28 @@ func (le *LeaderElector) renew(ctx context.Context) {
klog.Infof("failed to renew lease %v: %v", desc, err)
cancel()
}, le.config.RetryPeriod, ctx.Done())
// if we hold the lease, give it up
if le.config.ReleaseOnCancel {
le.release()
}
}
// release attempts to release the leader lease if we have acquired it.
func (le *LeaderElector) release() bool {
if !le.IsLeader() {
return true
}
leaderElectionRecord := rl.LeaderElectionRecord{
LeaderTransitions: le.observedRecord.LeaderTransitions,
}
if err := le.config.Lock.Update(leaderElectionRecord); err != nil {
klog.Errorf("Failed to release lock: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
@ -291,7 +320,8 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
le.observedRecord = *oldLeaderElectionRecord
le.observedTime = le.clock.Now()
}
if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false

View File

@ -55,6 +55,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
past := time.Now().Add(-1000 * time.Hour)
tests := []struct {
name string
observedRecord rl.LeaderElectionRecord
observedTime time.Time
reactors []struct {
@ -66,8 +67,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
transitionLeader bool
outHolder string
}{
// acquire from no object
{
name: "acquire from no object",
reactors: []struct {
verb string
reaction core.ReactionFunc
@ -88,8 +89,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
expectSuccess: true,
outHolder: "baz",
},
// acquire from unled object
{
name: "acquire from unled object",
reactors: []struct {
verb string
reaction core.ReactionFunc
@ -116,8 +117,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
transitionLeader: true,
outHolder: "baz",
},
// acquire from led, unacked object
{
name: "acquire from led, unacked object",
reactors: []struct {
verb string
reaction core.ReactionFunc
@ -149,8 +150,40 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
transitionLeader: true,
outHolder: "baz",
},
// don't acquire from led, acked object
{
name: "acquire from empty led, acked object",
reactors: []struct {
verb string
reaction core.ReactionFunc
}{
{
verb: "get",
reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
objectMeta := metav1.ObjectMeta{
Namespace: action.GetNamespace(),
Name: action.(core.GetAction).GetName(),
Annotations: map[string]string{
rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":""}`,
},
}
return true, createLockObject(objectType, objectMeta), nil
},
},
{
verb: "update",
reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(core.CreateAction).GetObject(), nil
},
},
},
observedTime: future,
expectSuccess: true,
transitionLeader: true,
outHolder: "baz",
},
{
name: "don't acquire from led, acked object",
reactors: []struct {
verb string
reaction core.ReactionFunc
@ -174,8 +207,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
expectSuccess: false,
outHolder: "bing",
},
// renew already acquired object
{
name: "renew already acquired object",
reactors: []struct {
verb string
reaction core.ReactionFunc
@ -208,83 +241,86 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) {
},
}
for i, test := range tests {
// OnNewLeader is called async so we have to wait for it.
var wg sync.WaitGroup
wg.Add(1)
var reportedLeader string
var lock rl.Interface
for i := range tests {
test := &tests[i]
t.Run(test.name, func(t *testing.T) {
// OnNewLeader is called async so we have to wait for it.
var wg sync.WaitGroup
wg.Add(1)
var reportedLeader string
var lock rl.Interface
objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
}
c := &fakecorev1.FakeCoreV1{Fake: &core.Fake{}}
for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, objectType, reactor.reaction)
}
c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
t.Errorf("[%v] unreachable action. testclient called too many times: %+v", i, action)
return true, nil, fmt.Errorf("unreachable action")
})
switch objectType {
case "endpoints":
lock = &rl.EndpointsLock{
EndpointsMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c,
objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
resourceLockConfig := rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
}
case "configmaps":
lock = &rl.ConfigMapLock{
ConfigMapMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c,
c := &fakecorev1.FakeCoreV1{Fake: &core.Fake{}}
for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, objectType, reactor.reaction)
}
}
c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
t.Errorf("unreachable action. testclient called too many times: %+v", action)
return true, nil, fmt.Errorf("unreachable action")
})
lec := LeaderElectionConfig{
Lock: lock,
LeaseDuration: 10 * time.Second,
Callbacks: LeaderCallbacks{
OnNewLeader: func(l string) {
defer wg.Done()
reportedLeader = l
switch objectType {
case "endpoints":
lock = &rl.EndpointsLock{
EndpointsMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c,
}
case "configmaps":
lock = &rl.ConfigMapLock{
ConfigMapMeta: objectMeta,
LockConfig: resourceLockConfig,
Client: c,
}
}
lec := LeaderElectionConfig{
Lock: lock,
LeaseDuration: 10 * time.Second,
Callbacks: LeaderCallbacks{
OnNewLeader: func(l string) {
defer wg.Done()
reportedLeader = l
},
},
},
}
le := &LeaderElector{
config: lec,
observedRecord: test.observedRecord,
observedTime: test.observedTime,
clock: clock.RealClock{},
}
}
le := &LeaderElector{
config: lec,
observedRecord: test.observedRecord,
observedTime: test.observedTime,
clock: clock.RealClock{},
}
if test.expectSuccess != le.tryAcquireOrRenew() {
t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeeded=%v]", i, !test.expectSuccess)
}
if test.expectSuccess != le.tryAcquireOrRenew() {
t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
}
le.observedRecord.AcquireTime = metav1.Time{}
le.observedRecord.RenewTime = metav1.Time{}
if le.observedRecord.HolderIdentity != test.outHolder {
t.Errorf("[%v]expected holder:\n\t%+v\ngot:\n\t%+v", i, test.outHolder, le.observedRecord.HolderIdentity)
}
if len(test.reactors) != len(c.Actions()) {
t.Errorf("[%v]wrong number of api interactions", i)
}
if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
t.Errorf("[%v]leader should have transitioned but did not", i)
}
if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
t.Errorf("[%v]leader should not have transitioned but did", i)
}
le.observedRecord.AcquireTime = metav1.Time{}
le.observedRecord.RenewTime = metav1.Time{}
if le.observedRecord.HolderIdentity != test.outHolder {
t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
}
if len(test.reactors) != len(c.Actions()) {
t.Errorf("wrong number of api interactions")
}
if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
t.Errorf("leader should have transitioned but did not")
}
if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
t.Errorf("leader should not have transitioned but did")
}
le.maybeReportTransition()
wg.Wait()
if reportedLeader != test.outHolder {
t.Errorf("[%v]reported leader was not the new leader. expected %q, got %q", i, test.outHolder, reportedLeader)
}
le.maybeReportTransition()
wg.Wait()
if reportedLeader != test.outHolder {
t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
}
})
}
}

View File

@ -17,8 +17,8 @@ go_library(
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
],
)

View File

@ -93,6 +93,9 @@ func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error {
// RecordEvent in leader election while adding meta-data
func (cml *ConfigMapLock) RecordEvent(s string) {
if cml.LockConfig.EventRecorder == nil {
return
}
events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s)
cml.LockConfig.EventRecorder.Eventf(&v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events)
}

View File

@ -88,6 +88,9 @@ func (el *EndpointsLock) Update(ler LeaderElectionRecord) error {
// RecordEvent in leader election while adding meta-data
func (el *EndpointsLock) RecordEvent(s string) {
if el.LockConfig.EventRecorder == nil {
return
}
events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s)
el.LockConfig.EventRecorder.Eventf(&v1.Endpoints{ObjectMeta: el.e.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events)
}

View File

@ -20,8 +20,8 @@ import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
)
const (
@ -35,6 +35,11 @@ const (
// with a random string (e.g. UUID) with only slight modification of this code.
// TODO(mikedanese): this should potentially be versioned
type LeaderElectionRecord struct {
// HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and
// all callers may acquire. Versions of this library prior to Kubernetes 1.14 will not
// attempt to acquire leases with empty identities and will wait for the full lease
// interval to expire before attempting to reacquire. This value is set to empty when
// a client voluntarily steps down.
HolderIdentity string `json:"holderIdentity"`
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
AcquireTime metav1.Time `json:"acquireTime"`
@ -42,11 +47,19 @@ type LeaderElectionRecord struct {
LeaderTransitions int `json:"leaderTransitions"`
}
// EventRecorder records a change in the ResourceLock.
type EventRecorder interface {
Eventf(obj runtime.Object, eventType, reason, message string, args ...interface{})
}
// ResourceLockConfig common data that exists across different
// resource locks
type ResourceLockConfig struct {
Identity string
EventRecorder record.EventRecorder
// Identity is the unique string identifying a lease holder across
// all participants in an election.
Identity string
// EventRecorder is optional.
EventRecorder EventRecorder
}
// Interface offers a common interface for locking on arbitrary

View File

@ -57,7 +57,10 @@ type Config struct {
// from TLSClientConfig, Transport, or http.DefaultTransport). The
// config may layer other RoundTrippers on top of the returned
// RoundTripper.
WrapTransport func(rt http.RoundTripper) http.RoundTripper
//
// A future release will change this field to an array. Use config.Wrap()
// instead of setting this value directly.
WrapTransport WrapperFunc
// Dial specifies the dial function for creating unencrypted TCP connections.
Dial func(ctx context.Context, network, address string) (net.Conn, error)
@ -98,6 +101,14 @@ func (c *Config) HasCertCallback() bool {
return c.TLS.GetCert != nil
}
// Wrap adds a transport middleware function that will give the caller
// an opportunity to wrap the underlying http.RoundTripper prior to the
// first API call being made. The provided function is invoked after any
// existing transport wrappers are invoked.
func (c *Config) Wrap(fn WrapperFunc) {
c.WrapTransport = Wrappers(c.WrapTransport, fn)
}
// TLSConfig holds the information needed to set up a TLS transport.
type TLSConfig struct {
CAFile string // Path of the PEM-encoded server trusted root certificates.

View File

@ -17,6 +17,7 @@ limitations under the License.
package transport
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
@ -167,3 +168,60 @@ func rootCertPool(caData []byte) *x509.CertPool {
certPool.AppendCertsFromPEM(caData)
return certPool
}
// WrapperFunc wraps an http.RoundTripper when a new transport
// is created for a client, allowing per connection behavior
// to be injected.
type WrapperFunc func(rt http.RoundTripper) http.RoundTripper
// Wrappers accepts any number of wrappers and returns a wrapper
// function that is the equivalent of calling each of them in order. Nil
// values are ignored, which makes this function convenient for incrementally
// wrapping a function.
func Wrappers(fns ...WrapperFunc) WrapperFunc {
if len(fns) == 0 {
return nil
}
// optimize the common case of wrapping a possibly nil transport wrapper
// with an additional wrapper
if len(fns) == 2 && fns[0] == nil {
return fns[1]
}
return func(rt http.RoundTripper) http.RoundTripper {
base := rt
for _, fn := range fns {
if fn != nil {
base = fn(base)
}
}
return base
}
}
// ContextCanceller prevents new requests after the provided context is finished.
// err is returned when the context is closed, allowing the caller to provide a context
// appropriate error.
func ContextCanceller(ctx context.Context, err error) WrapperFunc {
return func(rt http.RoundTripper) http.RoundTripper {
return &contextCanceller{
ctx: ctx,
rt: rt,
err: err,
}
}
}
type contextCanceller struct {
ctx context.Context
rt http.RoundTripper
err error
}
func (b *contextCanceller) RoundTrip(req *http.Request) (*http.Response, error) {
select {
case <-b.ctx.Done():
return nil, b.err
default:
return b.rt.RoundTrip(req)
}
}

View File

@ -17,8 +17,10 @@ limitations under the License.
package transport
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"testing"
)
@ -310,3 +312,141 @@ func TestNew(t *testing.T) {
})
}
}
type fakeRoundTripper struct {
Req *http.Request
Resp *http.Response
Err error
}
func (rt *fakeRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
rt.Req = req
return rt.Resp, rt.Err
}
type chainRoundTripper struct {
rt http.RoundTripper
value string
}
func testChain(value string) WrapperFunc {
return func(rt http.RoundTripper) http.RoundTripper {
return &chainRoundTripper{rt: rt, value: value}
}
}
func (rt *chainRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := rt.rt.RoundTrip(req)
if resp != nil {
if resp.Header == nil {
resp.Header = make(http.Header)
}
resp.Header.Set("Value", resp.Header.Get("Value")+rt.value)
}
return resp, err
}
func TestWrappers(t *testing.T) {
resp1 := &http.Response{}
wrapperResp1 := func(rt http.RoundTripper) http.RoundTripper {
return &fakeRoundTripper{Resp: resp1}
}
resp2 := &http.Response{}
wrapperResp2 := func(rt http.RoundTripper) http.RoundTripper {
return &fakeRoundTripper{Resp: resp2}
}
tests := []struct {
name string
fns []WrapperFunc
wantNil bool
want func(*http.Response) bool
}{
{fns: []WrapperFunc{}, wantNil: true},
{fns: []WrapperFunc{nil, nil}, wantNil: true},
{fns: []WrapperFunc{nil}, wantNil: false},
{fns: []WrapperFunc{nil, wrapperResp1}, want: func(resp *http.Response) bool { return resp == resp1 }},
{fns: []WrapperFunc{wrapperResp1, nil}, want: func(resp *http.Response) bool { return resp == resp1 }},
{fns: []WrapperFunc{nil, wrapperResp1, nil}, want: func(resp *http.Response) bool { return resp == resp1 }},
{fns: []WrapperFunc{nil, wrapperResp1, wrapperResp2}, want: func(resp *http.Response) bool { return resp == resp2 }},
{fns: []WrapperFunc{wrapperResp1, wrapperResp2}, want: func(resp *http.Response) bool { return resp == resp2 }},
{fns: []WrapperFunc{wrapperResp2, wrapperResp1}, want: func(resp *http.Response) bool { return resp == resp1 }},
{fns: []WrapperFunc{testChain("1")}, want: func(resp *http.Response) bool { return resp.Header.Get("Value") == "1" }},
{fns: []WrapperFunc{testChain("1"), testChain("2")}, want: func(resp *http.Response) bool { return resp.Header.Get("Value") == "12" }},
{fns: []WrapperFunc{testChain("2"), testChain("1")}, want: func(resp *http.Response) bool { return resp.Header.Get("Value") == "21" }},
{fns: []WrapperFunc{testChain("1"), testChain("2"), testChain("3")}, want: func(resp *http.Response) bool { return resp.Header.Get("Value") == "123" }},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := Wrappers(tt.fns...)
if got == nil != tt.wantNil {
t.Errorf("Wrappers() = %v", got)
return
}
if got == nil {
return
}
rt := &fakeRoundTripper{Resp: &http.Response{}}
nested := got(rt)
req := &http.Request{}
resp, _ := nested.RoundTrip(req)
if tt.want != nil && !tt.want(resp) {
t.Errorf("unexpected response: %#v", resp)
}
})
}
}
func Test_contextCanceller_RoundTrip(t *testing.T) {
tests := []struct {
name string
open bool
want bool
}{
{name: "open context should call nested round tripper", open: true, want: true},
{name: "closed context should return a known error", open: false, want: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req := &http.Request{}
rt := &fakeRoundTripper{Resp: &http.Response{}}
ctx := context.Background()
if !tt.open {
c, fn := context.WithCancel(ctx)
fn()
ctx = c
}
errTesting := fmt.Errorf("testing")
b := &contextCanceller{
rt: rt,
ctx: ctx,
err: errTesting,
}
got, err := b.RoundTrip(req)
if tt.want {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if got != rt.Resp {
t.Errorf("wanted response")
}
if req != rt.Req {
t.Errorf("expect nested call")
}
} else {
if err != errTesting {
t.Errorf("unexpected error: %v", err)
}
if got != nil {
t.Errorf("wanted no response")
}
if rt.Req != nil {
t.Errorf("want no nested call")
}
}
})
}
}