diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 77c07c658a..dec118036a 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -138,6 +138,7 @@ func Run(s *options.CMServer) error { if err != nil { glog.Fatalf("Invalid API configuration: %v", err) } + leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election")) go func() { mux := http.NewServeMux() @@ -183,12 +184,12 @@ func Run(s *options.CMServer) error { Namespace: "kube-system", Name: "kube-controller-manager", }, - EndpointsClient: kubeClient, - Identity: id, - EventRecorder: recorder, - LeaseDuration: s.LeaderElection.LeaseDuration.Duration, - RenewDeadline: s.LeaderElection.RenewDeadline.Duration, - RetryPeriod: s.LeaderElection.RetryPeriod.Duration, + Client: leaderElectionClient, + Identity: id, + EventRecorder: recorder, + LeaseDuration: s.LeaderElection.LeaseDuration.Duration, + RenewDeadline: s.LeaderElection.RenewDeadline.Duration, + RetryPeriod: s.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { diff --git a/pkg/client/leaderelection/leaderelection.go b/pkg/client/leaderelection/leaderelection.go index ce9821c395..fbab5126c8 100644 --- a/pkg/client/leaderelection/leaderelection.go +++ b/pkg/client/leaderelection/leaderelection.go @@ -58,7 +58,7 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/componentconfig" - coreclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" @@ -85,7 +85,7 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) { if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) { return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor") } - if lec.EndpointsClient == nil { + if lec.Client == nil { return nil, fmt.Errorf("EndpointsClient must not be nil.") } if lec.EventRecorder == nil { @@ -103,8 +103,8 @@ type LeaderElectionConfig struct { // Identity is a unique identifier of the leader elector. Identity string - EndpointsClient coreclientset.EndpointsGetter - EventRecorder record.EventRecorder + Client clientset.Interface + EventRecorder record.EventRecorder // LeaseDuration is the duration that non-leader candidates will // wait to force acquire leadership. This is measured against time of @@ -246,7 +246,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { AcquireTime: now, } - e, err := le.config.EndpointsClient.Endpoints(le.config.EndpointsMeta.Namespace).Get(le.config.EndpointsMeta.Name) + e, err := le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Get(le.config.EndpointsMeta.Name) if err != nil { if !errors.IsNotFound(err) { glog.Errorf("error retrieving endpoint: %v", err) @@ -257,7 +257,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { if err != nil { return false } - _, err = le.config.EndpointsClient.Endpoints(le.config.EndpointsMeta.Namespace).Create(&api.Endpoints{ + _, err = le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Create(&api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: le.config.EndpointsMeta.Name, Namespace: le.config.EndpointsMeta.Namespace, @@ -312,7 +312,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool { } e.Annotations[LeaderElectionRecordAnnotationKey] = string(leaderElectionRecordBytes) - _, err = le.config.EndpointsClient.Endpoints(le.config.EndpointsMeta.Namespace).Update(e) + _, err = le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Update(e) if err != nil { glog.Errorf("err: %v", err) return false diff --git a/pkg/client/leaderelection/leaderelection_test.go b/pkg/client/leaderelection/leaderelection_test.go index 5d90cca551..5df9a89dbe 100644 --- a/pkg/client/leaderelection/leaderelection_test.go +++ b/pkg/client/leaderelection/leaderelection_test.go @@ -29,9 +29,9 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + fakeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/record" - testcore "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/runtime" ) @@ -44,7 +44,7 @@ func TestTryAcquireOrRenew(t *testing.T) { observedTime time.Time reactors []struct { verb string - reaction testcore.ReactionFunc + reaction core.ReactionFunc } expectSuccess bool @@ -55,18 +55,18 @@ func TestTryAcquireOrRenew(t *testing.T) { { reactors: []struct { verb string - reaction testcore.ReactionFunc + reaction core.ReactionFunc }{ { verb: "get", - reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { - return true, nil, errors.NewNotFound(action.(testcore.GetAction).GetResource().GroupResource(), action.(testcore.GetAction).GetName()) + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.NewNotFound(action.(core.GetAction).GetResource().GroupResource(), action.(core.GetAction).GetName()) }, }, { verb: "create", - reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject().(*api.Endpoints), nil }, }, }, @@ -77,23 +77,23 @@ func TestTryAcquireOrRenew(t *testing.T) { { reactors: []struct { verb string - reaction testcore.ReactionFunc + reaction core.ReactionFunc }{ { verb: "get", - reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { return true, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Namespace: action.GetNamespace(), - Name: action.(testcore.GetAction).GetName(), + Name: action.(core.GetAction).GetName(), }, }, nil }, }, { verb: "update", - reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject().(*api.Endpoints), nil }, }, }, @@ -106,15 +106,15 @@ func TestTryAcquireOrRenew(t *testing.T) { { reactors: []struct { verb string - reaction testcore.ReactionFunc + reaction core.ReactionFunc }{ { verb: "get", - reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { return true, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Namespace: action.GetNamespace(), - Name: action.(testcore.GetAction).GetName(), + Name: action.(core.GetAction).GetName(), Annotations: map[string]string{ LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`, }, @@ -124,8 +124,8 @@ func TestTryAcquireOrRenew(t *testing.T) { }, { verb: "update", - reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject().(*api.Endpoints), nil }, }, }, @@ -140,15 +140,15 @@ func TestTryAcquireOrRenew(t *testing.T) { { reactors: []struct { verb string - reaction testcore.ReactionFunc + reaction core.ReactionFunc }{ { verb: "get", - reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { return true, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Namespace: action.GetNamespace(), - Name: action.(testcore.GetAction).GetName(), + Name: action.(core.GetAction).GetName(), Annotations: map[string]string{ LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`, }, @@ -166,15 +166,15 @@ func TestTryAcquireOrRenew(t *testing.T) { { reactors: []struct { verb string - reaction testcore.ReactionFunc + reaction core.ReactionFunc }{ { verb: "get", - reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { return true, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Namespace: action.GetNamespace(), - Name: action.(testcore.GetAction).GetName(), + Name: action.(core.GetAction).GetName(), Annotations: map[string]string{ LeaderElectionRecordAnnotationKey: `{"holderIdentity":"baz"}`, }, @@ -184,8 +184,8 @@ func TestTryAcquireOrRenew(t *testing.T) { }, { verb: "update", - reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject().(*api.Endpoints), nil }, }, }, @@ -215,11 +215,11 @@ func TestTryAcquireOrRenew(t *testing.T) { }, }, } - c := &fake.Clientset{} + c := &fakeclientset.Clientset{Fake: core.Fake{}} for _, reactor := range test.reactors { c.AddReactor(reactor.verb, "endpoints", reactor.reaction) } - c.AddReactor("*", "*", func(action testcore.Action) (bool, runtime.Object, error) { + c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { t.Errorf("[%v] unreachable action. testclient called too many times: %+v", i, action) return true, nil, fmt.Errorf("uncreachable action") }) @@ -229,7 +229,7 @@ func TestTryAcquireOrRenew(t *testing.T) { observedRecord: test.observedRecord, observedTime: test.observedTime, } - le.config.EndpointsClient = c.Core() + le.config.Client = c if test.expectSuccess != le.tryAcquireOrRenew() { t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeded=%v]", i, !test.expectSuccess) diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index 5977fa84e7..c8a933a201 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -30,6 +30,7 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/client/restclient" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/healthz" @@ -90,7 +91,7 @@ func Run(s *options.SchedulerServer) error { if err != nil { glog.Fatalf("Invalid API configuration: %v", err) } - kubeClientset, err := clientset.NewForConfig(kubeconfig) + leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "leader-election")) if err != nil { glog.Fatalf("Invalid API configuration: %v", err) } @@ -149,12 +150,12 @@ func Run(s *options.SchedulerServer) error { Namespace: "kube-system", Name: "kube-scheduler", }, - EndpointsClient: kubeClientset.Core(), - Identity: id, - EventRecorder: config.Recorder, - LeaseDuration: s.LeaderElection.LeaseDuration.Duration, - RenewDeadline: s.LeaderElection.RenewDeadline.Duration, - RetryPeriod: s.LeaderElection.RetryPeriod.Duration, + Client: leaderElectionClient, + Identity: id, + EventRecorder: config.Recorder, + LeaseDuration: s.LeaderElection.LeaseDuration.Duration, + RenewDeadline: s.LeaderElection.RenewDeadline.Duration, + RetryPeriod: s.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() {