Set UserAgent in LeaderElection clients.

pull/6/head
Wojciech Tyczynski 2016-09-26 09:46:59 +02:00
parent c19e08ebbc
commit cbcce2c90f
4 changed files with 51 additions and 49 deletions

View File

@ -138,6 +138,7 @@ func Run(s *options.CMServer) error {
if err != nil { if err != nil {
glog.Fatalf("Invalid API configuration: %v", err) glog.Fatalf("Invalid API configuration: %v", err)
} }
leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election"))
go func() { go func() {
mux := http.NewServeMux() mux := http.NewServeMux()
@ -183,12 +184,12 @@ func Run(s *options.CMServer) error {
Namespace: "kube-system", Namespace: "kube-system",
Name: "kube-controller-manager", Name: "kube-controller-manager",
}, },
EndpointsClient: kubeClient, Client: leaderElectionClient,
Identity: id, Identity: id,
EventRecorder: recorder, EventRecorder: recorder,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration, LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration, RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration, RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{ Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run, OnStartedLeading: run,
OnStoppedLeading: func() { OnStoppedLeading: func() {

View File

@ -58,7 +58,7 @@ import (
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/componentconfig" "k8s.io/kubernetes/pkg/apis/componentconfig"
coreclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
@ -85,7 +85,7 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) { if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor") return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
} }
if lec.EndpointsClient == nil { if lec.Client == nil {
return nil, fmt.Errorf("EndpointsClient must not be nil.") return nil, fmt.Errorf("EndpointsClient must not be nil.")
} }
if lec.EventRecorder == nil { if lec.EventRecorder == nil {
@ -103,8 +103,8 @@ type LeaderElectionConfig struct {
// Identity is a unique identifier of the leader elector. // Identity is a unique identifier of the leader elector.
Identity string Identity string
EndpointsClient coreclientset.EndpointsGetter Client clientset.Interface
EventRecorder record.EventRecorder EventRecorder record.EventRecorder
// LeaseDuration is the duration that non-leader candidates will // LeaseDuration is the duration that non-leader candidates will
// wait to force acquire leadership. This is measured against time of // wait to force acquire leadership. This is measured against time of
@ -246,7 +246,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
AcquireTime: now, AcquireTime: now,
} }
e, err := le.config.EndpointsClient.Endpoints(le.config.EndpointsMeta.Namespace).Get(le.config.EndpointsMeta.Name) e, err := le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Get(le.config.EndpointsMeta.Name)
if err != nil { if err != nil {
if !errors.IsNotFound(err) { if !errors.IsNotFound(err) {
glog.Errorf("error retrieving endpoint: %v", err) glog.Errorf("error retrieving endpoint: %v", err)
@ -257,7 +257,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
if err != nil { if err != nil {
return false return false
} }
_, err = le.config.EndpointsClient.Endpoints(le.config.EndpointsMeta.Namespace).Create(&api.Endpoints{ _, err = le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Create(&api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: le.config.EndpointsMeta.Name, Name: le.config.EndpointsMeta.Name,
Namespace: le.config.EndpointsMeta.Namespace, Namespace: le.config.EndpointsMeta.Namespace,
@ -312,7 +312,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
} }
e.Annotations[LeaderElectionRecordAnnotationKey] = string(leaderElectionRecordBytes) e.Annotations[LeaderElectionRecordAnnotationKey] = string(leaderElectionRecordBytes)
_, err = le.config.EndpointsClient.Endpoints(le.config.EndpointsMeta.Namespace).Update(e) _, err = le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Update(e)
if err != nil { if err != nil {
glog.Errorf("err: %v", err) glog.Errorf("err: %v", err)
return false return false

View File

@ -29,9 +29,9 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" fakeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
testcore "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
) )
@ -44,7 +44,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
observedTime time.Time observedTime time.Time
reactors []struct { reactors []struct {
verb string verb string
reaction testcore.ReactionFunc reaction core.ReactionFunc
} }
expectSuccess bool expectSuccess bool
@ -55,18 +55,18 @@ func TestTryAcquireOrRenew(t *testing.T) {
{ {
reactors: []struct { reactors: []struct {
verb string verb string
reaction testcore.ReactionFunc reaction core.ReactionFunc
}{ }{
{ {
verb: "get", verb: "get",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(testcore.GetAction).GetResource().GroupResource(), action.(testcore.GetAction).GetName()) return true, nil, errors.NewNotFound(action.(core.GetAction).GetResource().GroupResource(), action.(core.GetAction).GetName())
}, },
}, },
{ {
verb: "create", verb: "create",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil return true, action.(core.CreateAction).GetObject().(*api.Endpoints), nil
}, },
}, },
}, },
@ -77,23 +77,23 @@ func TestTryAcquireOrRenew(t *testing.T) {
{ {
reactors: []struct { reactors: []struct {
verb string verb string
reaction testcore.ReactionFunc reaction core.ReactionFunc
}{ }{
{ {
verb: "get", verb: "get",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &api.Endpoints{ return true, &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Namespace: action.GetNamespace(), Namespace: action.GetNamespace(),
Name: action.(testcore.GetAction).GetName(), Name: action.(core.GetAction).GetName(),
}, },
}, nil }, nil
}, },
}, },
{ {
verb: "update", verb: "update",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil return true, action.(core.CreateAction).GetObject().(*api.Endpoints), nil
}, },
}, },
}, },
@ -106,15 +106,15 @@ func TestTryAcquireOrRenew(t *testing.T) {
{ {
reactors: []struct { reactors: []struct {
verb string verb string
reaction testcore.ReactionFunc reaction core.ReactionFunc
}{ }{
{ {
verb: "get", verb: "get",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &api.Endpoints{ return true, &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Namespace: action.GetNamespace(), Namespace: action.GetNamespace(),
Name: action.(testcore.GetAction).GetName(), Name: action.(core.GetAction).GetName(),
Annotations: map[string]string{ Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`, LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`,
}, },
@ -124,8 +124,8 @@ func TestTryAcquireOrRenew(t *testing.T) {
}, },
{ {
verb: "update", verb: "update",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil return true, action.(core.CreateAction).GetObject().(*api.Endpoints), nil
}, },
}, },
}, },
@ -140,15 +140,15 @@ func TestTryAcquireOrRenew(t *testing.T) {
{ {
reactors: []struct { reactors: []struct {
verb string verb string
reaction testcore.ReactionFunc reaction core.ReactionFunc
}{ }{
{ {
verb: "get", verb: "get",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &api.Endpoints{ return true, &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Namespace: action.GetNamespace(), Namespace: action.GetNamespace(),
Name: action.(testcore.GetAction).GetName(), Name: action.(core.GetAction).GetName(),
Annotations: map[string]string{ Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`, LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`,
}, },
@ -166,15 +166,15 @@ func TestTryAcquireOrRenew(t *testing.T) {
{ {
reactors: []struct { reactors: []struct {
verb string verb string
reaction testcore.ReactionFunc reaction core.ReactionFunc
}{ }{
{ {
verb: "get", verb: "get",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &api.Endpoints{ return true, &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Namespace: action.GetNamespace(), Namespace: action.GetNamespace(),
Name: action.(testcore.GetAction).GetName(), Name: action.(core.GetAction).GetName(),
Annotations: map[string]string{ Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: `{"holderIdentity":"baz"}`, LeaderElectionRecordAnnotationKey: `{"holderIdentity":"baz"}`,
}, },
@ -184,8 +184,8 @@ func TestTryAcquireOrRenew(t *testing.T) {
}, },
{ {
verb: "update", verb: "update",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) { reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil return true, action.(core.CreateAction).GetObject().(*api.Endpoints), nil
}, },
}, },
}, },
@ -215,11 +215,11 @@ func TestTryAcquireOrRenew(t *testing.T) {
}, },
}, },
} }
c := &fake.Clientset{} c := &fakeclientset.Clientset{Fake: core.Fake{}}
for _, reactor := range test.reactors { for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, "endpoints", reactor.reaction) c.AddReactor(reactor.verb, "endpoints", reactor.reaction)
} }
c.AddReactor("*", "*", func(action testcore.Action) (bool, runtime.Object, error) { c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
t.Errorf("[%v] unreachable action. testclient called too many times: %+v", i, action) t.Errorf("[%v] unreachable action. testclient called too many times: %+v", i, action)
return true, nil, fmt.Errorf("uncreachable action") return true, nil, fmt.Errorf("uncreachable action")
}) })
@ -229,7 +229,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
observedRecord: test.observedRecord, observedRecord: test.observedRecord,
observedTime: test.observedTime, observedTime: test.observedTime,
} }
le.config.EndpointsClient = c.Core() le.config.Client = c
if test.expectSuccess != le.tryAcquireOrRenew() { if test.expectSuccess != le.tryAcquireOrRenew() {
t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeded=%v]", i, !test.expectSuccess) t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeded=%v]", i, !test.expectSuccess)

View File

@ -30,6 +30,7 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/healthz"
@ -90,7 +91,7 @@ func Run(s *options.SchedulerServer) error {
if err != nil { if err != nil {
glog.Fatalf("Invalid API configuration: %v", err) glog.Fatalf("Invalid API configuration: %v", err)
} }
kubeClientset, err := clientset.NewForConfig(kubeconfig) leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "leader-election"))
if err != nil { if err != nil {
glog.Fatalf("Invalid API configuration: %v", err) glog.Fatalf("Invalid API configuration: %v", err)
} }
@ -149,12 +150,12 @@ func Run(s *options.SchedulerServer) error {
Namespace: "kube-system", Namespace: "kube-system",
Name: "kube-scheduler", Name: "kube-scheduler",
}, },
EndpointsClient: kubeClientset.Core(), Client: leaderElectionClient,
Identity: id, Identity: id,
EventRecorder: config.Recorder, EventRecorder: config.Recorder,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration, LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration, RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration, RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{ Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run, OnStartedLeading: run,
OnStoppedLeading: func() { OnStoppedLeading: func() {