diff --git a/staging/src/k8s.io/client-go/discovery/cached_discovery.go b/staging/src/k8s.io/client-go/discovery/cached_discovery.go index df69d6a193..61a758c012 100644 --- a/staging/src/k8s.io/client-go/discovery/cached_discovery.go +++ b/staging/src/k8s.io/client-go/discovery/cached_discovery.go @@ -25,7 +25,7 @@ import ( "sync" "time" - "github.com/googleapis/gnostic/OpenAPIv2" + openapi_v2 "github.com/googleapis/gnostic/OpenAPIv2" "k8s.io/klog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -266,13 +266,10 @@ func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCache if len(httpCacheDir) > 0 { // update the given restconfig with a custom roundtripper that // understands how to handle cache responses. - wt := config.WrapTransport - config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper { - if wt != nil { - rt = wt(rt) - } + config = restclient.CopyConfig(config) + config.Wrap(func(rt http.RoundTripper) http.RoundTripper { return newCacheRoundTripper(httpCacheDir, rt) - } + }) } discoveryClient, err := NewDiscoveryClientForConfig(config) diff --git a/staging/src/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go b/staging/src/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go index 4d72526583..be4814bcc3 100644 --- a/staging/src/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go +++ b/staging/src/k8s.io/client-go/plugin/pkg/client/auth/exec/exec.go @@ -32,7 +32,7 @@ import ( "time" "golang.org/x/crypto/ssh/terminal" - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -172,13 +172,9 @@ type credentials struct { // UpdateTransportConfig updates the transport.Config to use credentials // returned by the plugin. func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error { - wt := c.WrapTransport - c.WrapTransport = func(rt http.RoundTripper) http.RoundTripper { - if wt != nil { - rt = wt(rt) - } + c.Wrap(func(rt http.RoundTripper) http.RoundTripper { return &roundTripper{a, rt} - } + }) if c.TLS.GetCert != nil { return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set") diff --git a/staging/src/k8s.io/client-go/rest/BUILD b/staging/src/k8s.io/client-go/rest/BUILD index 9f00aac950..4966e09a2b 100644 --- a/staging/src/k8s.io/client-go/rest/BUILD +++ b/staging/src/k8s.io/client-go/rest/BUILD @@ -36,6 +36,7 @@ go_test( "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/rest/watch:go_default_library", "//staging/src/k8s.io/client-go/tools/clientcmd/api:go_default_library", + "//staging/src/k8s.io/client-go/transport:go_default_library", "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", "//staging/src/k8s.io/client-go/util/testing:go_default_library", "//vendor/github.com/google/gofuzz:go_default_library", diff --git a/staging/src/k8s.io/client-go/rest/config.go b/staging/src/k8s.io/client-go/rest/config.go index 072e7392b1..271693c2c5 100644 --- a/staging/src/k8s.io/client-go/rest/config.go +++ b/staging/src/k8s.io/client-go/rest/config.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/pkg/version" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/transport" certutil "k8s.io/client-go/util/cert" "k8s.io/client-go/util/flowcontrol" "k8s.io/klog" @@ -95,13 +96,16 @@ type Config struct { // Transport may be used for custom HTTP behavior. This attribute may not // be specified with the TLS client certificate options. Use WrapTransport - // for most client level operations. + // to provide additional per-server middleware behavior. Transport http.RoundTripper // WrapTransport will be invoked for custom HTTP behavior after the underlying // transport is initialized (either the transport created from TLSClientConfig, // Transport, or http.DefaultTransport). The config may layer other RoundTrippers // on top of the returned RoundTripper. - WrapTransport func(rt http.RoundTripper) http.RoundTripper + // + // A future release will change this field to an array. Use config.Wrap() + // instead of setting this value directly. + WrapTransport transport.WrapperFunc // QPS indicates the maximum QPS to the master from this client. // If it's zero, the created RESTClient will use DefaultQPS: 5 diff --git a/staging/src/k8s.io/client-go/rest/config_test.go b/staging/src/k8s.io/client-go/rest/config_test.go index 22c18d77ed..8f5cce674d 100644 --- a/staging/src/k8s.io/client-go/rest/config_test.go +++ b/staging/src/k8s.io/client-go/rest/config_test.go @@ -27,12 +27,13 @@ import ( "strings" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/client-go/kubernetes/scheme" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/transport" "k8s.io/client-go/util/flowcontrol" fuzz "github.com/google/gofuzz" @@ -236,6 +237,9 @@ func TestAnonymousConfig(t *testing.T) { func(fn *func(http.RoundTripper) http.RoundTripper, f fuzz.Continue) { *fn = fakeWrapperFunc }, + func(fn *transport.WrapperFunc, f fuzz.Continue) { + *fn = fakeWrapperFunc + }, func(r *runtime.NegotiatedSerializer, f fuzz.Continue) { serializer := &fakeNegotiatedSerializer{} f.Fuzz(serializer) @@ -316,6 +320,9 @@ func TestCopyConfig(t *testing.T) { func(fn *func(http.RoundTripper) http.RoundTripper, f fuzz.Continue) { *fn = fakeWrapperFunc }, + func(fn *transport.WrapperFunc, f fuzz.Continue) { + *fn = fakeWrapperFunc + }, func(r *runtime.NegotiatedSerializer, f fuzz.Continue) { serializer := &fakeNegotiatedSerializer{} f.Fuzz(serializer) diff --git a/staging/src/k8s.io/client-go/rest/transport.go b/staging/src/k8s.io/client-go/rest/transport.go index 25c1801b67..bd5749dc62 100644 --- a/staging/src/k8s.io/client-go/rest/transport.go +++ b/staging/src/k8s.io/client-go/rest/transport.go @@ -103,14 +103,15 @@ func (c *Config) TransportConfig() (*transport.Config, error) { if err != nil { return nil, err } - wt := conf.WrapTransport - if wt != nil { - conf.WrapTransport = func(rt http.RoundTripper) http.RoundTripper { - return provider.WrapTransport(wt(rt)) - } - } else { - conf.WrapTransport = provider.WrapTransport - } + conf.Wrap(provider.WrapTransport) } return conf, nil } + +// Wrap adds a transport middleware function that will give the caller +// an opportunity to wrap the underlying http.RoundTripper prior to the +// first API call being made. The provided function is invoked after any +// existing transport wrappers are invoked. +func (c *Config) Wrap(fn transport.WrapperFunc) { + c.WrapTransport = transport.Wrappers(c.WrapTransport, fn) +} diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/BUILD index d91d57d055..e185e3f67e 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/BUILD +++ b/staging/src/k8s.io/client-go/tools/leaderelection/BUILD @@ -57,6 +57,7 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", + "//staging/src/k8s.io/client-go/tools/leaderelection/example:all-srcs", "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:all-srcs", ], tags = ["automanaged"], diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD new file mode 100644 index 0000000000..13ff7abd26 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/leaderelection/example/BUILD @@ -0,0 +1,39 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "go_default_library", + srcs = ["main.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/leaderelection/example", + importpath = "k8s.io/client-go/tools/leaderelection/example", + visibility = ["//visibility:private"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library", + "//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library", + "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library", + "//staging/src/k8s.io/client-go/transport:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +go_binary( + name = "example", + embed = [":go_default_library"], + visibility = ["//visibility:public"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go b/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go new file mode 100644 index 0000000000..ebcc0e8dad --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/leaderelection/example/main.go @@ -0,0 +1,135 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "os/signal" + "strings" + "syscall" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/transport" + "k8s.io/klog" +) + +// main demonstrates a leader elected process that will step down if interrupted. +func main() { + klog.InitFlags(nil) + flag.Parse() + args := flag.Args() + if len(args) != 3 { + log.Fatalf("requires three arguments: ID NAMESPACE CONFIG_MAP_NAME (%d)", len(args)) + } + + // leader election uses the Kubernetes API by writing to a ConfigMap or Endpoints + // object. Conflicting writes are detected and each client handles those actions + // independently. + var config *rest.Config + var err error + if kubeconfig := os.Getenv("KUBECONFIG"); len(kubeconfig) > 0 { + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + } else { + config, err = rest.InClusterConfig() + } + if err != nil { + log.Fatalf("failed to create client: %v", err) + } + + // we use the ConfigMap lock type since edits to ConfigMaps are less common + // and fewer objects in the cluster watch "all ConfigMaps" (unlike the older + // Endpoints lock type, where quite a few system agents like the kube-proxy + // and ingress controllers must watch endpoints). + id := args[0] + lock := &resourcelock.ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{ + Namespace: args[1], + Name: args[2], + }, + Client: kubernetes.NewForConfigOrDie(config).CoreV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id, + }, + } + + // use a Go context so we can tell the leaderelection code when we + // want to step down + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // use a client that will stop allowing new requests once the context ends + config.Wrap(transport.ContextCanceller(ctx, fmt.Errorf("the leader is shutting down"))) + exampleClient := kubernetes.NewForConfigOrDie(config).CoreV1() + + // listen for interrupts or the Linux SIGTERM signal and cancel + // our context, which the leader election code will observe and + // step down + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + go func() { + <-ch + log.Printf("Received termination, signaling shutdown") + cancel() + }() + + // start the leader election code loop + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + // IMPORTANT: you MUST ensure that any code you have that + // is protected by the lease must terminate **before** + // you call cancel. Otherwise, you could have a background + // loop still running and another process could + // get elected before your background loop finished, violating + // the stated goal of the lease. + ReleaseOnCancel: true, + LeaseDuration: 60 * time.Second, + RenewDeadline: 15 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + // we're notified when we start - this is where you would + // usually put your code + log.Printf("%s: leading", id) + }, + OnStoppedLeading: func() { + // we can do cleanup here, or after the RunOrDie method + // returns + log.Printf("%s: lost", id) + }, + }, + }) + + // because the context is closed, the client should report errors + _, err = exampleClient.ConfigMaps(args[1]).Get(args[2], metav1.GetOptions{}) + if err == nil || !strings.Contains(err.Error(), "the leader is shutting down") { + log.Fatalf("%s: expected to get an error when trying to make a client call: %v", id, err) + } + + // we no longer hold the lease, so perform any cleanup and then + // exit + log.Printf("%s: done", id) +} diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go index 19c57f316e..02bdebd1d3 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -121,6 +121,13 @@ type LeaderElectionConfig struct { // WatchDog may be null if its not needed/configured. WatchDog *HealthzAdaptor + // ReleaseOnCancel should be set true if the lock should be released + // when the run context is cancelled. If you set this to true, you must + // ensure all code guarded by this lease has successfully completed + // prior to cancelling the context, or you may have two processes + // simultaneously acting on the critical path. + ReleaseOnCancel bool + // Name is the name of the resource lock for debugging Name string } @@ -256,6 +263,28 @@ func (le *LeaderElector) renew(ctx context.Context) { klog.Infof("failed to renew lease %v: %v", desc, err) cancel() }, le.config.RetryPeriod, ctx.Done()) + + // if we hold the lease, give it up + if le.config.ReleaseOnCancel { + le.release() + } +} + +// release attempts to release the leader lease if we have acquired it. +func (le *LeaderElector) release() bool { + if !le.IsLeader() { + return true + } + leaderElectionRecord := rl.LeaderElectionRecord{ + LeaderTransitions: le.observedRecord.LeaderTransitions, + } + if err := le.config.Lock.Update(leaderElectionRecord); err != nil { + klog.Errorf("Failed to release lock: %v", err) + return false + } + le.observedRecord = leaderElectionRecord + le.observedTime = le.clock.Now() + return true } // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, @@ -291,7 +320,8 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { le.observedRecord = *oldLeaderElectionRecord le.observedTime = le.clock.Now() } - if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && + if len(oldLeaderElectionRecord.HolderIdentity) > 0 && + le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) return false diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go index 842aebdab2..c286b25790 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go @@ -55,6 +55,7 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { past := time.Now().Add(-1000 * time.Hour) tests := []struct { + name string observedRecord rl.LeaderElectionRecord observedTime time.Time reactors []struct { @@ -66,8 +67,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { transitionLeader bool outHolder string }{ - // acquire from no object { + name: "acquire from no object", reactors: []struct { verb string reaction core.ReactionFunc @@ -88,8 +89,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { expectSuccess: true, outHolder: "baz", }, - // acquire from unled object { + name: "acquire from unled object", reactors: []struct { verb string reaction core.ReactionFunc @@ -116,8 +117,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { transitionLeader: true, outHolder: "baz", }, - // acquire from led, unacked object { + name: "acquire from led, unacked object", reactors: []struct { verb string reaction core.ReactionFunc @@ -149,8 +150,40 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { transitionLeader: true, outHolder: "baz", }, - // don't acquire from led, acked object { + name: "acquire from empty led, acked object", + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + objectMeta := metav1.ObjectMeta{ + Namespace: action.GetNamespace(), + Name: action.(core.GetAction).GetName(), + Annotations: map[string]string{ + rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":""}`, + }, + } + return true, createLockObject(objectType, objectMeta), nil + }, + }, + { + verb: "update", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject(), nil + }, + }, + }, + observedTime: future, + + expectSuccess: true, + transitionLeader: true, + outHolder: "baz", + }, + { + name: "don't acquire from led, acked object", reactors: []struct { verb string reaction core.ReactionFunc @@ -174,8 +207,8 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { expectSuccess: false, outHolder: "bing", }, - // renew already acquired object { + name: "renew already acquired object", reactors: []struct { verb string reaction core.ReactionFunc @@ -208,83 +241,86 @@ func testTryAcquireOrRenew(t *testing.T, objectType string) { }, } - for i, test := range tests { - // OnNewLeader is called async so we have to wait for it. - var wg sync.WaitGroup - wg.Add(1) - var reportedLeader string - var lock rl.Interface + for i := range tests { + test := &tests[i] + t.Run(test.name, func(t *testing.T) { + // OnNewLeader is called async so we have to wait for it. + var wg sync.WaitGroup + wg.Add(1) + var reportedLeader string + var lock rl.Interface - objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} - resourceLockConfig := rl.ResourceLockConfig{ - Identity: "baz", - EventRecorder: &record.FakeRecorder{}, - } - c := &fakecorev1.FakeCoreV1{Fake: &core.Fake{}} - for _, reactor := range test.reactors { - c.AddReactor(reactor.verb, objectType, reactor.reaction) - } - c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { - t.Errorf("[%v] unreachable action. testclient called too many times: %+v", i, action) - return true, nil, fmt.Errorf("unreachable action") - }) - - switch objectType { - case "endpoints": - lock = &rl.EndpointsLock{ - EndpointsMeta: objectMeta, - LockConfig: resourceLockConfig, - Client: c, + objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"} + resourceLockConfig := rl.ResourceLockConfig{ + Identity: "baz", + EventRecorder: &record.FakeRecorder{}, } - case "configmaps": - lock = &rl.ConfigMapLock{ - ConfigMapMeta: objectMeta, - LockConfig: resourceLockConfig, - Client: c, + c := &fakecorev1.FakeCoreV1{Fake: &core.Fake{}} + for _, reactor := range test.reactors { + c.AddReactor(reactor.verb, objectType, reactor.reaction) } - } + c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { + t.Errorf("unreachable action. testclient called too many times: %+v", action) + return true, nil, fmt.Errorf("unreachable action") + }) - lec := LeaderElectionConfig{ - Lock: lock, - LeaseDuration: 10 * time.Second, - Callbacks: LeaderCallbacks{ - OnNewLeader: func(l string) { - defer wg.Done() - reportedLeader = l + switch objectType { + case "endpoints": + lock = &rl.EndpointsLock{ + EndpointsMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c, + } + case "configmaps": + lock = &rl.ConfigMapLock{ + ConfigMapMeta: objectMeta, + LockConfig: resourceLockConfig, + Client: c, + } + } + + lec := LeaderElectionConfig{ + Lock: lock, + LeaseDuration: 10 * time.Second, + Callbacks: LeaderCallbacks{ + OnNewLeader: func(l string) { + defer wg.Done() + reportedLeader = l + }, }, - }, - } - le := &LeaderElector{ - config: lec, - observedRecord: test.observedRecord, - observedTime: test.observedTime, - clock: clock.RealClock{}, - } + } + le := &LeaderElector{ + config: lec, + observedRecord: test.observedRecord, + observedTime: test.observedTime, + clock: clock.RealClock{}, + } - if test.expectSuccess != le.tryAcquireOrRenew() { - t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeeded=%v]", i, !test.expectSuccess) - } + if test.expectSuccess != le.tryAcquireOrRenew() { + t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess) + } - le.observedRecord.AcquireTime = metav1.Time{} - le.observedRecord.RenewTime = metav1.Time{} - if le.observedRecord.HolderIdentity != test.outHolder { - t.Errorf("[%v]expected holder:\n\t%+v\ngot:\n\t%+v", i, test.outHolder, le.observedRecord.HolderIdentity) - } - if len(test.reactors) != len(c.Actions()) { - t.Errorf("[%v]wrong number of api interactions", i) - } - if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 { - t.Errorf("[%v]leader should have transitioned but did not", i) - } - if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { - t.Errorf("[%v]leader should not have transitioned but did", i) - } + le.observedRecord.AcquireTime = metav1.Time{} + le.observedRecord.RenewTime = metav1.Time{} + if le.observedRecord.HolderIdentity != test.outHolder { + t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity) + } + if len(test.reactors) != len(c.Actions()) { + t.Errorf("wrong number of api interactions") + } + if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 { + t.Errorf("leader should have transitioned but did not") + } + if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { + t.Errorf("leader should not have transitioned but did") + } - le.maybeReportTransition() - wg.Wait() - if reportedLeader != test.outHolder { - t.Errorf("[%v]reported leader was not the new leader. expected %q, got %q", i, test.outHolder, reportedLeader) - } + le.maybeReportTransition() + wg.Wait() + if reportedLeader != test.outHolder { + t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader) + } + }) } } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD index 07c797c23f..367d454d49 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/BUILD @@ -17,8 +17,8 @@ go_library( deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go index c12daad022..785356894f 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go @@ -93,6 +93,9 @@ func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error { // RecordEvent in leader election while adding meta-data func (cml *ConfigMapLock) RecordEvent(s string) { + if cml.LockConfig.EventRecorder == nil { + return + } events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s) cml.LockConfig.EventRecorder.Eventf(&v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go index 6f7dcfb0cc..bfe5e8b1bb 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go @@ -88,6 +88,9 @@ func (el *EndpointsLock) Update(ler LeaderElectionRecord) error { // RecordEvent in leader election while adding meta-data func (el *EndpointsLock) RecordEvent(s string) { + if el.LockConfig.EventRecorder == nil { + return + } events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s) el.LockConfig.EventRecorder.Eventf(&v1.Endpoints{ObjectMeta: el.e.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) } diff --git a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go index 676fd1d7db..0bf8e9cd6e 100644 --- a/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go +++ b/staging/src/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go @@ -20,8 +20,8 @@ import ( "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/record" ) const ( @@ -35,6 +35,11 @@ const ( // with a random string (e.g. UUID) with only slight modification of this code. // TODO(mikedanese): this should potentially be versioned type LeaderElectionRecord struct { + // HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and + // all callers may acquire. Versions of this library prior to Kubernetes 1.14 will not + // attempt to acquire leases with empty identities and will wait for the full lease + // interval to expire before attempting to reacquire. This value is set to empty when + // a client voluntarily steps down. HolderIdentity string `json:"holderIdentity"` LeaseDurationSeconds int `json:"leaseDurationSeconds"` AcquireTime metav1.Time `json:"acquireTime"` @@ -42,11 +47,19 @@ type LeaderElectionRecord struct { LeaderTransitions int `json:"leaderTransitions"` } +// EventRecorder records a change in the ResourceLock. +type EventRecorder interface { + Eventf(obj runtime.Object, eventType, reason, message string, args ...interface{}) +} + // ResourceLockConfig common data that exists across different // resource locks type ResourceLockConfig struct { - Identity string - EventRecorder record.EventRecorder + // Identity is the unique string identifying a lease holder across + // all participants in an election. + Identity string + // EventRecorder is optional. + EventRecorder EventRecorder } // Interface offers a common interface for locking on arbitrary diff --git a/staging/src/k8s.io/client-go/transport/config.go b/staging/src/k8s.io/client-go/transport/config.go index acb126d8b0..5de0a2cb10 100644 --- a/staging/src/k8s.io/client-go/transport/config.go +++ b/staging/src/k8s.io/client-go/transport/config.go @@ -57,7 +57,10 @@ type Config struct { // from TLSClientConfig, Transport, or http.DefaultTransport). The // config may layer other RoundTrippers on top of the returned // RoundTripper. - WrapTransport func(rt http.RoundTripper) http.RoundTripper + // + // A future release will change this field to an array. Use config.Wrap() + // instead of setting this value directly. + WrapTransport WrapperFunc // Dial specifies the dial function for creating unencrypted TCP connections. Dial func(ctx context.Context, network, address string) (net.Conn, error) @@ -98,6 +101,14 @@ func (c *Config) HasCertCallback() bool { return c.TLS.GetCert != nil } +// Wrap adds a transport middleware function that will give the caller +// an opportunity to wrap the underlying http.RoundTripper prior to the +// first API call being made. The provided function is invoked after any +// existing transport wrappers are invoked. +func (c *Config) Wrap(fn WrapperFunc) { + c.WrapTransport = Wrappers(c.WrapTransport, fn) +} + // TLSConfig holds the information needed to set up a TLS transport. type TLSConfig struct { CAFile string // Path of the PEM-encoded server trusted root certificates. diff --git a/staging/src/k8s.io/client-go/transport/transport.go b/staging/src/k8s.io/client-go/transport/transport.go index c19739fdfe..2a145c971a 100644 --- a/staging/src/k8s.io/client-go/transport/transport.go +++ b/staging/src/k8s.io/client-go/transport/transport.go @@ -17,6 +17,7 @@ limitations under the License. package transport import ( + "context" "crypto/tls" "crypto/x509" "fmt" @@ -167,3 +168,60 @@ func rootCertPool(caData []byte) *x509.CertPool { certPool.AppendCertsFromPEM(caData) return certPool } + +// WrapperFunc wraps an http.RoundTripper when a new transport +// is created for a client, allowing per connection behavior +// to be injected. +type WrapperFunc func(rt http.RoundTripper) http.RoundTripper + +// Wrappers accepts any number of wrappers and returns a wrapper +// function that is the equivalent of calling each of them in order. Nil +// values are ignored, which makes this function convenient for incrementally +// wrapping a function. +func Wrappers(fns ...WrapperFunc) WrapperFunc { + if len(fns) == 0 { + return nil + } + // optimize the common case of wrapping a possibly nil transport wrapper + // with an additional wrapper + if len(fns) == 2 && fns[0] == nil { + return fns[1] + } + return func(rt http.RoundTripper) http.RoundTripper { + base := rt + for _, fn := range fns { + if fn != nil { + base = fn(base) + } + } + return base + } +} + +// ContextCanceller prevents new requests after the provided context is finished. +// err is returned when the context is closed, allowing the caller to provide a context +// appropriate error. +func ContextCanceller(ctx context.Context, err error) WrapperFunc { + return func(rt http.RoundTripper) http.RoundTripper { + return &contextCanceller{ + ctx: ctx, + rt: rt, + err: err, + } + } +} + +type contextCanceller struct { + ctx context.Context + rt http.RoundTripper + err error +} + +func (b *contextCanceller) RoundTrip(req *http.Request) (*http.Response, error) { + select { + case <-b.ctx.Done(): + return nil, b.err + default: + return b.rt.RoundTrip(req) + } +} diff --git a/staging/src/k8s.io/client-go/transport/transport_test.go b/staging/src/k8s.io/client-go/transport/transport_test.go index eead38aacc..d8e7544321 100644 --- a/staging/src/k8s.io/client-go/transport/transport_test.go +++ b/staging/src/k8s.io/client-go/transport/transport_test.go @@ -17,8 +17,10 @@ limitations under the License. package transport import ( + "context" "crypto/tls" "errors" + "fmt" "net/http" "testing" ) @@ -310,3 +312,141 @@ func TestNew(t *testing.T) { }) } } + +type fakeRoundTripper struct { + Req *http.Request + Resp *http.Response + Err error +} + +func (rt *fakeRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + rt.Req = req + return rt.Resp, rt.Err +} + +type chainRoundTripper struct { + rt http.RoundTripper + value string +} + +func testChain(value string) WrapperFunc { + return func(rt http.RoundTripper) http.RoundTripper { + return &chainRoundTripper{rt: rt, value: value} + } +} + +func (rt *chainRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := rt.rt.RoundTrip(req) + if resp != nil { + if resp.Header == nil { + resp.Header = make(http.Header) + } + resp.Header.Set("Value", resp.Header.Get("Value")+rt.value) + } + return resp, err +} + +func TestWrappers(t *testing.T) { + resp1 := &http.Response{} + wrapperResp1 := func(rt http.RoundTripper) http.RoundTripper { + return &fakeRoundTripper{Resp: resp1} + } + resp2 := &http.Response{} + wrapperResp2 := func(rt http.RoundTripper) http.RoundTripper { + return &fakeRoundTripper{Resp: resp2} + } + + tests := []struct { + name string + fns []WrapperFunc + wantNil bool + want func(*http.Response) bool + }{ + {fns: []WrapperFunc{}, wantNil: true}, + {fns: []WrapperFunc{nil, nil}, wantNil: true}, + {fns: []WrapperFunc{nil}, wantNil: false}, + + {fns: []WrapperFunc{nil, wrapperResp1}, want: func(resp *http.Response) bool { return resp == resp1 }}, + {fns: []WrapperFunc{wrapperResp1, nil}, want: func(resp *http.Response) bool { return resp == resp1 }}, + {fns: []WrapperFunc{nil, wrapperResp1, nil}, want: func(resp *http.Response) bool { return resp == resp1 }}, + {fns: []WrapperFunc{nil, wrapperResp1, wrapperResp2}, want: func(resp *http.Response) bool { return resp == resp2 }}, + {fns: []WrapperFunc{wrapperResp1, wrapperResp2}, want: func(resp *http.Response) bool { return resp == resp2 }}, + {fns: []WrapperFunc{wrapperResp2, wrapperResp1}, want: func(resp *http.Response) bool { return resp == resp1 }}, + + {fns: []WrapperFunc{testChain("1")}, want: func(resp *http.Response) bool { return resp.Header.Get("Value") == "1" }}, + {fns: []WrapperFunc{testChain("1"), testChain("2")}, want: func(resp *http.Response) bool { return resp.Header.Get("Value") == "12" }}, + {fns: []WrapperFunc{testChain("2"), testChain("1")}, want: func(resp *http.Response) bool { return resp.Header.Get("Value") == "21" }}, + {fns: []WrapperFunc{testChain("1"), testChain("2"), testChain("3")}, want: func(resp *http.Response) bool { return resp.Header.Get("Value") == "123" }}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := Wrappers(tt.fns...) + if got == nil != tt.wantNil { + t.Errorf("Wrappers() = %v", got) + return + } + if got == nil { + return + } + + rt := &fakeRoundTripper{Resp: &http.Response{}} + nested := got(rt) + req := &http.Request{} + resp, _ := nested.RoundTrip(req) + if tt.want != nil && !tt.want(resp) { + t.Errorf("unexpected response: %#v", resp) + } + }) + } +} + +func Test_contextCanceller_RoundTrip(t *testing.T) { + tests := []struct { + name string + open bool + want bool + }{ + {name: "open context should call nested round tripper", open: true, want: true}, + {name: "closed context should return a known error", open: false, want: false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := &http.Request{} + rt := &fakeRoundTripper{Resp: &http.Response{}} + ctx := context.Background() + if !tt.open { + c, fn := context.WithCancel(ctx) + fn() + ctx = c + } + errTesting := fmt.Errorf("testing") + b := &contextCanceller{ + rt: rt, + ctx: ctx, + err: errTesting, + } + got, err := b.RoundTrip(req) + if tt.want { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if got != rt.Resp { + t.Errorf("wanted response") + } + if req != rt.Req { + t.Errorf("expect nested call") + } + } else { + if err != errTesting { + t.Errorf("unexpected error: %v", err) + } + if got != nil { + t.Errorf("wanted no response") + } + if rt.Req != nil { + t.Errorf("want no nested call") + } + } + }) + } +}