Merge pull request #33474 from wojtek-t/user_agent_for_master_election

Automatic merge from submit-queue

Set UserAgent in LeaderElection clients.

Ref https://github.com/kubernetes/kubernetes/issues/33259

@timothysc - FYI
pull/6/head
Kubernetes Submit Queue 2016-09-27 22:58:08 -07:00 committed by GitHub
commit b2c175c758
4 changed files with 51 additions and 49 deletions

View File

@ -138,6 +138,7 @@ func Run(s *options.CMServer) error {
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election"))
go func() {
mux := http.NewServeMux()
@ -183,12 +184,12 @@ func Run(s *options.CMServer) error {
Namespace: "kube-system",
Name: "kube-controller-manager",
},
EndpointsClient: kubeClient,
Identity: id,
EventRecorder: recorder,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Client: leaderElectionClient,
Identity: id,
EventRecorder: recorder,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {

View File

@ -58,7 +58,7 @@ import (
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/componentconfig"
coreclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
@ -85,7 +85,7 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
}
if lec.EndpointsClient == nil {
if lec.Client == nil {
return nil, fmt.Errorf("EndpointsClient must not be nil.")
}
if lec.EventRecorder == nil {
@ -103,8 +103,8 @@ type LeaderElectionConfig struct {
// Identity is a unique identifier of the leader elector.
Identity string
EndpointsClient coreclientset.EndpointsGetter
EventRecorder record.EventRecorder
Client clientset.Interface
EventRecorder record.EventRecorder
// LeaseDuration is the duration that non-leader candidates will
// wait to force acquire leadership. This is measured against time of
@ -246,7 +246,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
AcquireTime: now,
}
e, err := le.config.EndpointsClient.Endpoints(le.config.EndpointsMeta.Namespace).Get(le.config.EndpointsMeta.Name)
e, err := le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Get(le.config.EndpointsMeta.Name)
if err != nil {
if !errors.IsNotFound(err) {
glog.Errorf("error retrieving endpoint: %v", err)
@ -257,7 +257,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
if err != nil {
return false
}
_, err = le.config.EndpointsClient.Endpoints(le.config.EndpointsMeta.Namespace).Create(&api.Endpoints{
_, err = le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Create(&api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: le.config.EndpointsMeta.Name,
Namespace: le.config.EndpointsMeta.Namespace,
@ -312,7 +312,7 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
}
e.Annotations[LeaderElectionRecordAnnotationKey] = string(leaderElectionRecordBytes)
_, err = le.config.EndpointsClient.Endpoints(le.config.EndpointsMeta.Namespace).Update(e)
_, err = le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Update(e)
if err != nil {
glog.Errorf("err: %v", err)
return false

View File

@ -29,9 +29,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
fakeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/record"
testcore "k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/runtime"
)
@ -44,7 +44,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
observedTime time.Time
reactors []struct {
verb string
reaction testcore.ReactionFunc
reaction core.ReactionFunc
}
expectSuccess bool
@ -55,18 +55,18 @@ func TestTryAcquireOrRenew(t *testing.T) {
{
reactors: []struct {
verb string
reaction testcore.ReactionFunc
reaction core.ReactionFunc
}{
{
verb: "get",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(testcore.GetAction).GetResource().GroupResource(), action.(testcore.GetAction).GetName())
reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(action.(core.GetAction).GetResource().GroupResource(), action.(core.GetAction).GetName())
},
},
{
verb: "create",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil
reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(core.CreateAction).GetObject().(*api.Endpoints), nil
},
},
},
@ -77,23 +77,23 @@ func TestTryAcquireOrRenew(t *testing.T) {
{
reactors: []struct {
verb string
reaction testcore.ReactionFunc
reaction core.ReactionFunc
}{
{
verb: "get",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Namespace: action.GetNamespace(),
Name: action.(testcore.GetAction).GetName(),
Name: action.(core.GetAction).GetName(),
},
}, nil
},
},
{
verb: "update",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil
reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(core.CreateAction).GetObject().(*api.Endpoints), nil
},
},
},
@ -106,15 +106,15 @@ func TestTryAcquireOrRenew(t *testing.T) {
{
reactors: []struct {
verb string
reaction testcore.ReactionFunc
reaction core.ReactionFunc
}{
{
verb: "get",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Namespace: action.GetNamespace(),
Name: action.(testcore.GetAction).GetName(),
Name: action.(core.GetAction).GetName(),
Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`,
},
@ -124,8 +124,8 @@ func TestTryAcquireOrRenew(t *testing.T) {
},
{
verb: "update",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil
reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(core.CreateAction).GetObject().(*api.Endpoints), nil
},
},
},
@ -140,15 +140,15 @@ func TestTryAcquireOrRenew(t *testing.T) {
{
reactors: []struct {
verb string
reaction testcore.ReactionFunc
reaction core.ReactionFunc
}{
{
verb: "get",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Namespace: action.GetNamespace(),
Name: action.(testcore.GetAction).GetName(),
Name: action.(core.GetAction).GetName(),
Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`,
},
@ -166,15 +166,15 @@ func TestTryAcquireOrRenew(t *testing.T) {
{
reactors: []struct {
verb string
reaction testcore.ReactionFunc
reaction core.ReactionFunc
}{
{
verb: "get",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Namespace: action.GetNamespace(),
Name: action.(testcore.GetAction).GetName(),
Name: action.(core.GetAction).GetName(),
Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: `{"holderIdentity":"baz"}`,
},
@ -184,8 +184,8 @@ func TestTryAcquireOrRenew(t *testing.T) {
},
{
verb: "update",
reaction: func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(testcore.CreateAction).GetObject().(*api.Endpoints), nil
reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(core.CreateAction).GetObject().(*api.Endpoints), nil
},
},
},
@ -215,11 +215,11 @@ func TestTryAcquireOrRenew(t *testing.T) {
},
},
}
c := &fake.Clientset{}
c := &fakeclientset.Clientset{Fake: core.Fake{}}
for _, reactor := range test.reactors {
c.AddReactor(reactor.verb, "endpoints", reactor.reaction)
}
c.AddReactor("*", "*", func(action testcore.Action) (bool, runtime.Object, error) {
c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
t.Errorf("[%v] unreachable action. testclient called too many times: %+v", i, action)
return true, nil, fmt.Errorf("uncreachable action")
})
@ -229,7 +229,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
observedRecord: test.observedRecord,
observedTime: test.observedTime,
}
le.config.EndpointsClient = c.Core()
le.config.Client = c
if test.expectSuccess != le.tryAcquireOrRenew() {
t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeded=%v]", i, !test.expectSuccess)

View File

@ -30,6 +30,7 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/healthz"
@ -90,7 +91,7 @@ func Run(s *options.SchedulerServer) error {
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
kubeClientset, err := clientset.NewForConfig(kubeconfig)
leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "leader-election"))
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
@ -149,12 +150,12 @@ func Run(s *options.SchedulerServer) error {
Namespace: "kube-system",
Name: "kube-scheduler",
},
EndpointsClient: kubeClientset.Core(),
Identity: id,
EventRecorder: config.Recorder,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Client: leaderElectionClient,
Identity: id,
EventRecorder: config.Recorder,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {