From 09890b6c48da8e85237a5674d6256900f482b0a5 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 27 Nov 2018 21:51:21 -0500 Subject: [PATCH 1/3] leaderelection: Allow leader elected code to step down on a context cancel The current code simply exits without continuing to renew the lease, which means participants using a slower lease duration might have to wait multiple minutes before a new leader is elected. Allow an optional flag to be set on LeaderElectionConfig that will release the lease when the calling context is cancelled. Callers *must* ensure their lease guarded code has completed before the context is cancelled, or other processes may acquire the lease before this lease has released. Add an example command that demonstrates how cancellation could be done. As a convenience to users, make event recorder optional - not all users of the lock code will need a recorder. --- .../client-go/tools/leaderelection/BUILD | 1 + .../tools/leaderelection/example/BUILD | 38 ++++ .../tools/leaderelection/example/main.go | 122 ++++++++++++ .../tools/leaderelection/leaderelection.go | 32 ++- .../leaderelection/leaderelection_test.go | 184 +++++++++++------- .../tools/leaderelection/resourcelock/BUILD | 2 +- .../resourcelock/configmaplock.go | 3 + .../resourcelock/endpointslock.go | 3 + .../leaderelection/resourcelock/interface.go | 19 +- 9 files changed, 325 insertions(+), 79 deletions(-) create mode 100644 staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD create mode 100644 staging/src/k8s.io/client-go/tools/leaderelection/example/main.go diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/BUILD index bed0f89fee..5144467782 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/BUILD +++ b/staging/src/k8s.io/client-go/tools/leaderelection/BUILD @@ -56,6 +56,7 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", + "//staging/src/k8s.io/client-go/tools/leaderelection/example:all-srcs", "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:all-srcs", ], tags = ["automanaged"], diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD new file mode 100644 index 0000000000..cb1cfbca0f --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD @@ -0,0 +1,38 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "go_default_library", + srcs = ["main.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/leaderelection/example", + importpath = "k8s.io/client-go/tools/leaderelection/example", + visibility = ["//visibility:private"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library", + "//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library", + "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +go_binary( + name = "example", + embed = [":go_default_library"], + visibility = ["//visibility:public"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go b/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go new file mode 100644 index 0000000000..91511e5b16 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go @@ -0,0 +1,122 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "log" + "os" + "os/signal" + "syscall" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/klog" +) + +// main demonstrates a leader elected process that will step down if interrupted. +func main() { + klog.InitFlags(nil) + flag.Parse() + args := flag.Args() + if len(args) != 3 { + log.Fatalf("requires three arguments: ID NAMESPACE CONFIG_MAP_NAME (%d)", len(args)) + } + + // leader election uses the Kubernetes API by writing to a ConfigMap or Endpoints + // object. Conflicting writes are detected and each client handles those actions + // independently. + var config *rest.Config + var err error + if kubeconfig := os.Getenv("KUBECONFIG"); len(kubeconfig) > 0 { + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + } else { + config, err = rest.InClusterConfig() + } + if err != nil { + log.Fatalf("failed to create client: %v", err) + } + + // we use the ConfigMap lock type since edits to ConfigMaps are less common + // and fewer objects in the cluster watch "all ConfigMaps" (unlike the older + // Endpoints lock type, where quite a few system agents like the kube-proxy + // and ingress controllers must watch endpoints). + id := args[0] + lock := &resourcelock.ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{ + Namespace: args[1], + Name: args[2], + }, + Client: kubernetes.NewForConfigOrDie(config).CoreV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id, + }, + } + + // use a Go context so we can tell the leaderelection code when we + // want to step down + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // listen for interrupts or the Linux SIGTERM signal and cancel + // our context, which the leader election code will observe and + // step down + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + go func() { + <-ch + log.Printf("Received termination, signaling shutdown") + cancel() + }() + + // start the leader election code loop + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + // IMPORTANT: you MUST ensure that any code you have that + // is protected by the lease must terminate **before** + // you call cancel. Otherwise, you could have a background + // loop still running and another process could + // get elected before your background loop finished, violating + // the stated goal of the lease. + ReleaseOnCancel: true, + LeaseDuration: 60 * time.Second, + RenewDeadline: 15 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + // we're notified when we start - this is where you would + // usually put your code + log.Printf("%s: leading", id) + }, + OnStoppedLeading: func() { + // we can do cleanup here, or after the RunOrDie method + // returns + log.Printf("%s: lost", id) + }, + }, + }) + + // we no longer hold the lease, so perform any cleanup and then + // exit + log.Printf("%s: done", id) +} diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go index 2096a5996b..d3eb79b8e5 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -118,6 +118,13 @@ type LeaderElectionConfig struct { // WatchDog may be null if its not needed/configured. WatchDog *HealthzAdaptor + // ReleaseOnCancel should be set true if the lock should be released + // when the run context is cancelled. If you set this to true, you must + // ensure all code guarded by this lease has successfully completed + // prior to cancelling the context, or you may have two processes + // simultaneously acting on the critical path. + ReleaseOnCancel bool + // Name is the name of the resource lock for debugging Name string } @@ -249,6 +256,28 @@ func (le *LeaderElector) renew(ctx context.Context) { klog.Infof("failed to renew lease %v: %v", desc, err) cancel() }, le.config.RetryPeriod, ctx.Done()) + + // if we hold the lease, give it up + if le.config.ReleaseOnCancel { + le.release() + } +} + +// release attempts to release the leader lease if we have acquired it. +func (le *LeaderElector) release() bool { + if !le.IsLeader() { + return true + } + leaderElectionRecord := rl.LeaderElectionRecord{ + LeaderTransitions: le.observedRecord.LeaderTransitions, + } + if err := le.config.Lock.Update(leaderElectionRecord); err != nil { + klog.Errorf("Failed to release lock: %v", err) + return false + } + le.observedRecord = leaderElectionRecord + le.observedTime = le.clock.Now() + return true } // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, @@ -284,7 +313,8 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { le.observedRecord = *oldLeaderElectionRecord le.observedTime = le.clock.Now() } - if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && + if len(oldLeaderElectionRecord.HolderIdentity) > 0 && + le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) return false diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go index 842aebdab2..c286b25790 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go @@ -55,6 +55,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { past := time.Now().Add(-1000 * time.Hour) tests := []struct { + name string observedRecord rl.LeaderElectionRecord observedTime time.Time reactors []struct { @@ -66,8 +67,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { transitionLeader bool outHolder string }{ - // acquire from no object { + name: "acquire from no object", reactors: []struct { verb string reaction core.ReactionFunc @@ -88,8 +89,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { expectSuccess: true, outHolder: "baz", }, - // acquire from unled object { + name: "acquire from unled object", reactors: []struct { verb string reaction core.ReactionFunc @@ -116,8 +117,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { transitionLeader: true, outHolder: "baz", }, - // acquire from led, unacked object { + name: "acquire from led, unacked object", reactors: []struct { verb string reaction core.ReactionFunc @@ -149,8 +150,40 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { transitionLeader: true, outHolder: "baz", }, - // don't acquire from led, acked object { + name: "acquire from empty led, acked object", + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + objectMeta := metav1.ObjectMeta{ + Namespace: action.GetNamespace(), + Name: action.(core.GetAction).GetName(), + Annotations: map[string]string{ + rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":""}`, + }, + } + return true, createLockObject(objectType, objectMeta), nil + }, + }, + { + verb: "update", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject(), nil + }, + }, + }, + observedTime: future, + + expectSuccess: true, + transitionLeader: true, + outHolder: "baz", + }, + { + name: "don't acquire from led, acked object", reactors: []struct { verb string reaction core.ReactionFunc @@ -174,8 +207,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { expectSuccess: false, outHolder: "bing", }, - // renew already acquired object { + name: "renew already acquired object", reactors: []struct { verb string reaction core.ReactionFunc @@ -208,83 +241,86 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { }, } - for i, test := range tests { - // OnNewLeader is called async so we have to wait for it. - var wg sync.WaitGroup - wg.Add(1) - var reportedLeader string - var lock rl.Interface + for i := range tests { + test := &tests[i] + t.Run(test.name, func(t *testing.T) { + // OnNewLeader is called async so we have to wait for it. + var wg sync.WaitGroup + wg.Add(1) + var reportedLeader string + var lock rl.Interface - objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} - resourceLockConfig := rl.ResourceLockConfig{ - Identity: "baz", - EventRecorder: &record.FakeRecorder{}, - } - c := &fakecorev1.FakeCoreV1{Fake: &core.Fake{}} - for _, reactor := range test.reactors { - c.AddReactor(reactor.verb, objectType, reactor.reaction) - } - c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { - t.Errorf("[%v] unreachable action. testclient called too many times: %+v", i, action) - return true, nil, fmt.Errorf("unreachable action") - }) - - switch objectType { - case "endpoints": - lock = &rl.EndpointsLock{ - EndpointsMeta: objectMeta, - LockConfig: resourceLockConfig, - Client: c, + objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} + resourceLockConfig := rl.ResourceLockConfig{ + Identity: "baz", + EventRecorder: &record.FakeRecorder{}, } - case "configmaps": - lock = &rl.ConfigMapLock{ - ConfigMapMeta: objectMeta, - LockConfig: resourceLockConfig, - Client: c, + c := &fakecorev1.FakeCoreV1{Fake: &core.Fake{}} + for _, reactor := range test.reactors { + c.AddReactor(reactor.verb, objectType, reactor.reaction) } - } + c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { + t.Errorf("unreachable action. testclient called too many times: %+v", action) + return true, nil, fmt.Errorf("unreachable action") + }) - lec := LeaderElectionConfig{ - Lock: lock, - LeaseDuration: 10 * time.Second, - Callbacks: LeaderCallbacks{ - OnNewLeader: func(l string) { - defer wg.Done() - reportedLeader = l + switch objectType { + case "endpoints": + lock = &rl.EndpointsLock{ + EndpointsMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c, + } + case "configmaps": + lock = &rl.ConfigMapLock{ + ConfigMapMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c, + } + } + + lec := LeaderElectionConfig{ + Lock: lock, + LeaseDuration: 10 * time.Second, + Callbacks: LeaderCallbacks{ + OnNewLeader: func(l string) { + defer wg.Done() + reportedLeader = l + }, }, - }, - } - le := &LeaderElector{ - config: lec, - observedRecord: test.observedRecord, - observedTime: test.observedTime, - clock: clock.RealClock{}, - } + } + le := &LeaderElector{ + config: lec, + observedRecord: test.observedRecord, + observedTime: test.observedTime, + clock: clock.RealClock{}, + } - if test.expectSuccess != le.tryAcquireOrRenew() { - t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeeded=%v]", i, !test.expectSuccess) - } + if test.expectSuccess != le.tryAcquireOrRenew() { + t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess) + } - le.observedRecord.AcquireTime = metav1.Time{} - le.observedRecord.RenewTime = metav1.Time{} - if le.observedRecord.HolderIdentity != test.outHolder { - t.Errorf("[%v]expected holder:\n\t%+v\ngot:\n\t%+v", i, test.outHolder, le.observedRecord.HolderIdentity) - } - if len(test.reactors) != len(c.Actions()) { - t.Errorf("[%v]wrong number of api interactions", i) - } - if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 { - t.Errorf("[%v]leader should have transitioned but did not", i) - } - if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { - t.Errorf("[%v]leader should not have transitioned but did", i) - } + le.observedRecord.AcquireTime = metav1.Time{} + le.observedRecord.RenewTime = metav1.Time{} + if le.observedRecord.HolderIdentity != test.outHolder { + t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity) + } + if len(test.reactors) != len(c.Actions()) { + t.Errorf("wrong number of api interactions") + } + if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 { + t.Errorf("leader should have transitioned but did not") + } + if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { + t.Errorf("leader should not have transitioned but did") + } - le.maybeReportTransition() - wg.Wait() - if reportedLeader != test.outHolder { - t.Errorf("[%v]reported leader was not the new leader. expected %q, got %q", i, test.outHolder, reportedLeader) - } + le.maybeReportTransition() + wg.Wait() + if reportedLeader != test.outHolder { + t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader) + } + }) } } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD index 07c797c23f..367d454d49 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD @@ -17,8 +17,8 @@ go_library( deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go index c12daad022..785356894f 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go @@ -93,6 +93,9 @@ func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error { // RecordEvent in leader election while adding meta-data func (cml *ConfigMapLock) RecordEvent(s string) { + if cml.LockConfig.EventRecorder == nil { + return + } events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s) cml.LockConfig.EventRecorder.Eventf(&v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go index 6f7dcfb0cc..bfe5e8b1bb 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go @@ -88,6 +88,9 @@ func (el *EndpointsLock) Update(ler LeaderElectionRecord) error { // RecordEvent in leader election while adding meta-data func (el *EndpointsLock) RecordEvent(s string) { + if el.LockConfig.EventRecorder == nil { + return + } events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s) el.LockConfig.EventRecorder.Eventf(&v1.Endpoints{ObjectMeta: el.e.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go index 676fd1d7db..0bf8e9cd6e 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go @@ -20,8 +20,8 @@ import ( "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/record" ) const ( @@ -35,6 +35,11 @@ const ( // with a random string (e.g. UUID) with only slight modification of this code. // TODO(mikedanese): this should potentially be versioned type LeaderElectionRecord struct { + // HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and + // all callers may acquire. Versions of this library prior to Kubernetes 1.14 will not + // attempt to acquire leases with empty identities and will wait for the full lease + // interval to expire before attempting to reacquire. This value is set to empty when + // a client voluntarily steps down. HolderIdentity string `json:"holderIdentity"` LeaseDurationSeconds int `json:"leaseDurationSeconds"` AcquireTime metav1.Time `json:"acquireTime"` @@ -42,11 +47,19 @@ type LeaderElectionRecord struct { LeaderTransitions int `json:"leaderTransitions"` } +// EventRecorder records a change in the ResourceLock. +type EventRecorder interface { + Eventf(obj runtime.Object, eventType, reason, message string, args ...interface{}) +} + // ResourceLockConfig common data that exists across different // resource locks type ResourceLockConfig struct { - Identity string - EventRecorder record.EventRecorder + // Identity is the unique string identifying a lease holder across + // all participants in an election. + Identity string + // EventRecorder is optional. + EventRecorder EventRecorder } // Interface offers a common interface for locking on arbitrary From 1f590e697ef64812620c787720b4b5942027e4a1 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 27 Dec 2018 11:47:50 -0500 Subject: [PATCH 2/3] Make wrapping a client transport more pleasant Properly wrapping a transport can be tricky. Make the normal case (adding a non-nil transport wrapper to a config) easier with a helper. Also enforce a rough ordering, which in the future we can use to simplify the WrapTransport mechanism down into an array of functions we execute in order and avoid wrapping altogether. --- .../client-go/discovery/cached_discovery.go | 11 +-- .../plugin/pkg/client/auth/exec/exec.go | 10 +-- staging/src/k8s.io/client-go/rest/config.go | 8 +- .../src/k8s.io/client-go/rest/config_test.go | 9 +- .../src/k8s.io/client-go/rest/transport.go | 17 ++-- .../src/k8s.io/client-go/transport/config.go | 13 ++- .../k8s.io/client-go/transport/transport.go | 29 +++++++ .../client-go/transport/transport_test.go | 87 +++++++++++++++++++ 8 files changed, 158 insertions(+), 26 deletions(-) diff --git a/staging/src/k8s.io/client-go/discovery/cached_discovery.go b/staging/src/k8s.io/client-go/discovery/cached_discovery.go index df69d6a193..61a758c012 100644 --- a/staging/src/k8s.io/client-go/discovery/cached_discovery.go +++ b/staging/src/k8s.io/client-go/discovery/cached_discovery.go @@ -25,7 +25,7 @@ import ( "sync" "time" - "github.com/googleapis/gnostic/OpenAPIv2" + openapi_v2 "github.com/googleapis/gnostic/OpenAPIv2" "k8s.io/klog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -266,13 +266,10 @@ func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCache if len(httpCacheDir) > 0 { // update the given restconfig with a custom roundtripper that // understands how to handle cache responses. - wt := config.WrapTransport - config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper { - if wt != nil { - rt = wt(rt) - } + config = restclient.CopyConfig(config) + config.Wrap(func(rt http.RoundTripper) http.RoundTripper { return newCacheRoundTripper(httpCacheDir, rt) - } + }) } discoveryClient, err := NewDiscoveryClientForConfig(config) diff --git a/staging/src/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go b/staging/src/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go index 4d72526583..be4814bcc3 100644 --- a/staging/src/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go +++ b/staging/src/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go @@ -32,7 +32,7 @@ import ( "time" "golang.org/x/crypto/ssh/terminal" - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -172,13 +172,9 @@ type credentials struct { // UpdateTransportConfig updates the transport.Config to use credentials // returned by the plugin. func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error { - wt := c.WrapTransport - c.WrapTransport = func(rt http.RoundTripper) http.RoundTripper { - if wt != nil { - rt = wt(rt) - } + c.Wrap(func(rt http.RoundTripper) http.RoundTripper { return &roundTripper{a, rt} - } + }) if c.TLS.GetCert != nil { return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set") diff --git a/staging/src/k8s.io/client-go/rest/config.go b/staging/src/k8s.io/client-go/rest/config.go index 072e7392b1..271693c2c5 100644 --- a/staging/src/k8s.io/client-go/rest/config.go +++ b/staging/src/k8s.io/client-go/rest/config.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/pkg/version" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/transport" certutil "k8s.io/client-go/util/cert" "k8s.io/client-go/util/flowcontrol" "k8s.io/klog" @@ -95,13 +96,16 @@ type Config struct { // Transport may be used for custom HTTP behavior. This attribute may not // be specified with the TLS client certificate options. Use WrapTransport - // for most client level operations. + // to provide additional per-server middleware behavior. Transport http.RoundTripper // WrapTransport will be invoked for custom HTTP behavior after the underlying // transport is initialized (either the transport created from TLSClientConfig, // Transport, or http.DefaultTransport). The config may layer other RoundTrippers // on top of the returned RoundTripper. - WrapTransport func(rt http.RoundTripper) http.RoundTripper + // + // A future release will change this field to an array. Use config.Wrap() + // instead of setting this value directly. + WrapTransport transport.WrapperFunc // QPS indicates the maximum QPS to the master from this client. // If it's zero, the created RESTClient will use DefaultQPS: 5 diff --git a/staging/src/k8s.io/client-go/rest/config_test.go b/staging/src/k8s.io/client-go/rest/config_test.go index 22c18d77ed..8f5cce674d 100644 --- a/staging/src/k8s.io/client-go/rest/config_test.go +++ b/staging/src/k8s.io/client-go/rest/config_test.go @@ -27,12 +27,13 @@ import ( "strings" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/client-go/kubernetes/scheme" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/transport" "k8s.io/client-go/util/flowcontrol" fuzz "github.com/google/gofuzz" @@ -236,6 +237,9 @@ func TestAnonymousConfig(t *testing.T) { func(fn *func(http.RoundTripper) http.RoundTripper, f fuzz.Continue) { *fn = fakeWrapperFunc }, + func(fn *transport.WrapperFunc, f fuzz.Continue) { + *fn = fakeWrapperFunc + }, func(r *runtime.NegotiatedSerializer, f fuzz.Continue) { serializer := &fakeNegotiatedSerializer{} f.Fuzz(serializer) @@ -316,6 +320,9 @@ func TestCopyConfig(t *testing.T) { func(fn *func(http.RoundTripper) http.RoundTripper, f fuzz.Continue) { *fn = fakeWrapperFunc }, + func(fn *transport.WrapperFunc, f fuzz.Continue) { + *fn = fakeWrapperFunc + }, func(r *runtime.NegotiatedSerializer, f fuzz.Continue) { serializer := &fakeNegotiatedSerializer{} f.Fuzz(serializer) diff --git a/staging/src/k8s.io/client-go/rest/transport.go b/staging/src/k8s.io/client-go/rest/transport.go index 25c1801b67..bd5749dc62 100644 --- a/staging/src/k8s.io/client-go/rest/transport.go +++ b/staging/src/k8s.io/client-go/rest/transport.go @@ -103,14 +103,15 @@ func (c *Config) TransportConfig() (*transport.Config, error) { if err != nil { return nil, err } - wt := conf.WrapTransport - if wt != nil { - conf.WrapTransport = func(rt http.RoundTripper) http.RoundTripper { - return provider.WrapTransport(wt(rt)) - } - } else { - conf.WrapTransport = provider.WrapTransport - } + conf.Wrap(provider.WrapTransport) } return conf, nil } + +// Wrap adds a transport middleware function that will give the caller +// an opportunity to wrap the underlying http.RoundTripper prior to the +// first API call being made. The provided function is invoked after any +// existing transport wrappers are invoked. +func (c *Config) Wrap(fn transport.WrapperFunc) { + c.WrapTransport = transport.Wrappers(c.WrapTransport, fn) +} diff --git a/staging/src/k8s.io/client-go/transport/config.go b/staging/src/k8s.io/client-go/transport/config.go index acb126d8b0..5de0a2cb10 100644 --- a/staging/src/k8s.io/client-go/transport/config.go +++ b/staging/src/k8s.io/client-go/transport/config.go @@ -57,7 +57,10 @@ type Config struct { // from TLSClientConfig, Transport, or http.DefaultTransport). The // config may layer other RoundTrippers on top of the returned // RoundTripper. - WrapTransport func(rt http.RoundTripper) http.RoundTripper + // + // A future release will change this field to an array. Use config.Wrap() + // instead of setting this value directly. + WrapTransport WrapperFunc // Dial specifies the dial function for creating unencrypted TCP connections. Dial func(ctx context.Context, network, address string) (net.Conn, error) @@ -98,6 +101,14 @@ func (c *Config) HasCertCallback() bool { return c.TLS.GetCert != nil } +// Wrap adds a transport middleware function that will give the caller +// an opportunity to wrap the underlying http.RoundTripper prior to the +// first API call being made. The provided function is invoked after any +// existing transport wrappers are invoked. +func (c *Config) Wrap(fn WrapperFunc) { + c.WrapTransport = Wrappers(c.WrapTransport, fn) +} + // TLSConfig holds the information needed to set up a TLS transport. type TLSConfig struct { CAFile string // Path of the PEM-encoded server trusted root certificates. diff --git a/staging/src/k8s.io/client-go/transport/transport.go b/staging/src/k8s.io/client-go/transport/transport.go index c19739fdfe..f62f8003d6 100644 --- a/staging/src/k8s.io/client-go/transport/transport.go +++ b/staging/src/k8s.io/client-go/transport/transport.go @@ -167,3 +167,32 @@ func rootCertPool(caData []byte) *x509.CertPool { certPool.AppendCertsFromPEM(caData) return certPool } + +// WrapperFunc wraps an http.RoundTripper when a new transport +// is created for a client, allowing per connection behavior +// to be injected. +type WrapperFunc func(rt http.RoundTripper) http.RoundTripper + +// Wrappers accepts any number of wrappers and returns a wrapper +// function that is the equivalent of calling each of them in order. Nil +// values are ignored, which makes this function convenient for incrementally +// wrapping a function. +func Wrappers(fns ...WrapperFunc) WrapperFunc { + if len(fns) == 0 { + return nil + } + // optimize the common case of wrapping a possibly nil transport wrapper + // with an additional wrapper + if len(fns) == 2 && fns[0] == nil { + return fns[1] + } + return func(rt http.RoundTripper) http.RoundTripper { + base := rt + for _, fn := range fns { + if fn != nil { + base = fn(base) + } + } + return base + } +} diff --git a/staging/src/k8s.io/client-go/transport/transport_test.go b/staging/src/k8s.io/client-go/transport/transport_test.go index eead38aacc..6685012106 100644 --- a/staging/src/k8s.io/client-go/transport/transport_test.go +++ b/staging/src/k8s.io/client-go/transport/transport_test.go @@ -310,3 +310,90 @@ func TestNew(t *testing.T) { }) } } + +type fakeRoundTripper struct { + Req *http.Request + Resp *http.Response + Err error +} + +func (rt *fakeRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + rt.Req = req + return rt.Resp, rt.Err +} + +type chainRoundTripper struct { + rt http.RoundTripper + value string +} + +func testChain(value string) WrapperFunc { + return func(rt http.RoundTripper) http.RoundTripper { + return &chainRoundTripper{rt: rt, value: value} + } +} + +func (rt *chainRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := rt.rt.RoundTrip(req) + if resp != nil { + if resp.Header == nil { + resp.Header = make(http.Header) + } + resp.Header.Set("Value", resp.Header.Get("Value")+rt.value) + } + return resp, err +} + +func TestWrappers(t *testing.T) { + resp1 := &http.Response{} + wrapperResp1 := func(rt http.RoundTripper) http.RoundTripper { + return &fakeRoundTripper{Resp: resp1} + } + resp2 := &http.Response{} + wrapperResp2 := func(rt http.RoundTripper) http.RoundTripper { + return &fakeRoundTripper{Resp: resp2} + } + + tests := []struct { + name string + fns []WrapperFunc + wantNil bool + want func(*http.Response) bool + }{ + {fns: []WrapperFunc{}, wantNil: true}, + {fns: []WrapperFunc{nil, nil}, wantNil: true}, + {fns: []WrapperFunc{nil}, wantNil: false}, + + {fns: []WrapperFunc{nil, wrapperResp1}, want: func(resp *http.Response) bool { return resp == resp1 }}, + {fns: []WrapperFunc{wrapperResp1, nil}, want: func(resp *http.Response) bool { return resp == resp1 }}, + {fns: []WrapperFunc{nil, wrapperResp1, nil}, want: func(resp *http.Response) bool { return resp == resp1 }}, + {fns: []WrapperFunc{nil, wrapperResp1, wrapperResp2}, want: func(resp *http.Response) bool { return resp == resp2 }}, + {fns: []WrapperFunc{wrapperResp1, wrapperResp2}, want: func(resp *http.Response) bool { return resp == resp2 }}, + {fns: []WrapperFunc{wrapperResp2, wrapperResp1}, want: func(resp *http.Response) bool { return resp == resp1 }}, + + {fns: []WrapperFunc{testChain("1")}, want: func(resp *http.Response) bool { return resp.Header.Get("Value") == "1" }}, + {fns: []WrapperFunc{testChain("1"), testChain("2")}, want: func(resp *http.Response) bool { return resp.Header.Get("Value") == "12" }}, + {fns: []WrapperFunc{testChain("2"), testChain("1")}, want: func(resp *http.Response) bool { return resp.Header.Get("Value") == "21" }}, + {fns: []WrapperFunc{testChain("1"), testChain("2"), testChain("3")}, want: func(resp *http.Response) bool { return resp.Header.Get("Value") == "123" }}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := Wrappers(tt.fns...) + if got == nil != tt.wantNil { + t.Errorf("Wrappers() = %v", got) + return + } + if got == nil { + return + } + + rt := &fakeRoundTripper{Resp: &http.Response{}} + nested := got(rt) + req := &http.Request{} + resp, _ := nested.RoundTrip(req) + if tt.want != nil && !tt.want(resp) { + t.Errorf("unexpected response: %#v", resp) + } + }) + } +} From fe74efb1f90826b1903d2908ff9e528329bebea0 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 27 Dec 2018 12:29:34 -0500 Subject: [PATCH 3/3] Add transport wrapper that blocks api calls after context close The ContextCanceller transport wrapper blocks all API requests after the provided context is closed. Used with the leader election step down, a controller can ensure that new requests are not made after the client has stepped down. --- staging/src/k8s.io/client-go/rest/BUILD | 1 + .../tools/leaderelection/example/BUILD | 1 + .../tools/leaderelection/example/main.go | 13 +++++ .../k8s.io/client-go/transport/transport.go | 29 ++++++++++ .../client-go/transport/transport_test.go | 53 +++++++++++++++++++ 5 files changed, 97 insertions(+) diff --git a/staging/src/k8s.io/client-go/rest/BUILD b/staging/src/k8s.io/client-go/rest/BUILD index 9f00aac950..4966e09a2b 100644 --- a/staging/src/k8s.io/client-go/rest/BUILD +++ b/staging/src/k8s.io/client-go/rest/BUILD @@ -36,6 +36,7 @@ go_test( "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/rest/watch:go_default_library", "//staging/src/k8s.io/client-go/tools/clientcmd/api:go_default_library", + "//staging/src/k8s.io/client-go/transport:go_default_library", "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", "//staging/src/k8s.io/client-go/util/testing:go_default_library", "//vendor/github.com/google/gofuzz:go_default_library", diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD index cb1cfbca0f..13ff7abd26 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD +++ b/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD @@ -13,6 +13,7 @@ go_library( "//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library", "//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library", "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library", + "//staging/src/k8s.io/client-go/transport:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go b/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go index 91511e5b16..ebcc0e8dad 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go @@ -19,9 +19,11 @@ package main import ( "context" "flag" + "fmt" "log" "os" "os/signal" + "strings" "syscall" "time" @@ -31,6 +33,7 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/transport" "k8s.io/klog" ) @@ -78,6 +81,10 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // use a client that will stop allowing new requests once the context ends + config.Wrap(transport.ContextCanceller(ctx, fmt.Errorf("the leader is shutting down"))) + exampleClient := kubernetes.NewForConfigOrDie(config).CoreV1() + // listen for interrupts or the Linux SIGTERM signal and cancel // our context, which the leader election code will observe and // step down @@ -116,6 +123,12 @@ func main() { }, }) + // because the context is closed, the client should report errors + _, err = exampleClient.ConfigMaps(args[1]).Get(args[2], metav1.GetOptions{}) + if err == nil || !strings.Contains(err.Error(), "the leader is shutting down") { + log.Fatalf("%s: expected to get an error when trying to make a client call: %v", id, err) + } + // we no longer hold the lease, so perform any cleanup and then // exit log.Printf("%s: done", id) diff --git a/staging/src/k8s.io/client-go/transport/transport.go b/staging/src/k8s.io/client-go/transport/transport.go index f62f8003d6..2a145c971a 100644 --- a/staging/src/k8s.io/client-go/transport/transport.go +++ b/staging/src/k8s.io/client-go/transport/transport.go @@ -17,6 +17,7 @@ limitations under the License. package transport import ( + "context" "crypto/tls" "crypto/x509" "fmt" @@ -196,3 +197,31 @@ func Wrappers(fns ...WrapperFunc) WrapperFunc { return base } } + +// ContextCanceller prevents new requests after the provided context is finished. +// err is returned when the context is closed, allowing the caller to provide a context +// appropriate error. +func ContextCanceller(ctx context.Context, err error) WrapperFunc { + return func(rt http.RoundTripper) http.RoundTripper { + return &contextCanceller{ + ctx: ctx, + rt: rt, + err: err, + } + } +} + +type contextCanceller struct { + ctx context.Context + rt http.RoundTripper + err error +} + +func (b *contextCanceller) RoundTrip(req *http.Request) (*http.Response, error) { + select { + case <-b.ctx.Done(): + return nil, b.err + default: + return b.rt.RoundTrip(req) + } +} diff --git a/staging/src/k8s.io/client-go/transport/transport_test.go b/staging/src/k8s.io/client-go/transport/transport_test.go index 6685012106..d8e7544321 100644 --- a/staging/src/k8s.io/client-go/transport/transport_test.go +++ b/staging/src/k8s.io/client-go/transport/transport_test.go @@ -17,8 +17,10 @@ limitations under the License. package transport import ( + "context" "crypto/tls" "errors" + "fmt" "net/http" "testing" ) @@ -397,3 +399,54 @@ func TestWrappers(t *testing.T) { }) } } + +func Test_contextCanceller_RoundTrip(t *testing.T) { + tests := []struct { + name string + open bool + want bool + }{ + {name: "open context should call nested round tripper", open: true, want: true}, + {name: "closed context should return a known error", open: false, want: false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := &http.Request{} + rt := &fakeRoundTripper{Resp: &http.Response{}} + ctx := context.Background() + if !tt.open { + c, fn := context.WithCancel(ctx) + fn() + ctx = c + } + errTesting := fmt.Errorf("testing") + b := &contextCanceller{ + rt: rt, + ctx: ctx, + err: errTesting, + } + got, err := b.RoundTrip(req) + if tt.want { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if got != rt.Resp { + t.Errorf("wanted response") + } + if req != rt.Req { + t.Errorf("expect nested call") + } + } else { + if err != errTesting { + t.Errorf("unexpected error: %v", err) + } + if got != nil { + t.Errorf("wanted no response") + } + if rt.Req != nil { + t.Errorf("want no nested call") + } + } + }) + } +}