Abstraction of endpoints in leaderelection code

pull/6/head
Timothy St. Clair 2016-09-15 14:17:18 -05:00
parent 918e99e298
commit 4a9f72b59f
6 changed files with 251 additions and 111 deletions

View File

@ -41,6 +41,7 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
@ -179,14 +180,21 @@ func Run(s *options.CMServer) error {
return err
}
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
// TODO: enable other lock types
rl := resourcelock.EndpointsLock{
EndpointsMeta: api.ObjectMeta{
Namespace: "kube-system",
Name: "kube-controller-manager",
},
Client: leaderElectionClient,
Identity: id,
EventRecorder: recorder,
Client: leaderElectionClient,
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
},
}
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: &rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,

View File

@ -49,17 +49,14 @@ limitations under the License.
package leaderelection
import (
"encoding/json"
"fmt"
"reflect"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/componentconfig"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
rl "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
@ -68,10 +65,7 @@ import (
)
const (
JitterFactor = 1.2
LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader"
JitterFactor = 1.2
DefaultLeaseDuration = 15 * time.Second
DefaultRenewDeadline = 10 * time.Second
DefaultRetryPeriod = 2 * time.Second
@ -85,11 +79,8 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
}
if lec.Client == nil {
return nil, fmt.Errorf("EndpointsClient must not be nil.")
}
if lec.EventRecorder == nil {
return nil, fmt.Errorf("EventRecorder must not be nil.")
if lec.Lock == nil {
return nil, fmt.Errorf("Lock must not be nil.")
}
return &LeaderElector{
config: lec,
@ -97,14 +88,8 @@ func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
}
type LeaderElectionConfig struct {
// EndpointsMeta should contain a Name and a Namespace of an
// Endpoints object that the LeaderElector will attempt to lead.
EndpointsMeta api.ObjectMeta
// Identity is a unique identifier of the leader elector.
Identity string
Client clientset.Interface
EventRecorder record.EventRecorder
// Lock is the resource that will be used for locking
Lock rl.Interface
// LeaseDuration is the duration that non-leader candidates will
// wait to force acquire leadership. This is measured against time of
@ -146,7 +131,7 @@ type LeaderCallbacks struct {
type LeaderElector struct {
config LeaderElectionConfig
// internal bookkeeping
observedRecord LeaderElectionRecord
observedRecord rl.LeaderElectionRecord
observedTime time.Time
// used to implement OnNewLeader(), may lag slightly from the
// value observedRecord.HolderIdentity if the transition has
@ -154,18 +139,6 @@ type LeaderElector struct {
reportedLeader string
}
// LeaderElectionRecord is the record that is stored in the leader election annotation.
// This information should be used for observational purposes only and could be replaced
// with a random string (e.g. UUID) with only slight modification of this code.
// TODO(mikedanese): this should potentially be versioned
type LeaderElectionRecord struct {
HolderIdentity string `json:"holderIdentity"`
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
AcquireTime unversioned.Time `json:"acquireTime"`
RenewTime unversioned.Time `json:"renewTime"`
LeaderTransitions int `json:"leaderTransitions"`
}
// Run starts the leader election loop
func (le *LeaderElector) Run() {
defer func() {
@ -197,7 +170,7 @@ func (le *LeaderElector) GetLeader() string {
// IsLeader returns true if the last observed leader was this client else returns false.
func (le *LeaderElector) IsLeader() bool {
return le.observedRecord.HolderIdentity == le.config.Identity
return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
}
// acquire loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew succeeds.
@ -206,12 +179,13 @@ func (le *LeaderElector) acquire() {
wait.JitterUntil(func() {
succeeded := le.tryAcquireOrRenew()
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if !succeeded {
glog.V(4).Infof("failed to renew lease %v/%v", le.config.EndpointsMeta.Namespace, le.config.EndpointsMeta.Name)
glog.V(4).Infof("failed to renew lease %v", desc)
return
}
le.config.EventRecorder.Eventf(&api.Endpoints{ObjectMeta: le.config.EndpointsMeta}, api.EventTypeNormal, "%v became leader", le.config.Identity)
glog.Infof("sucessfully acquired lease %v/%v", le.config.EndpointsMeta.Namespace, le.config.EndpointsMeta.Name)
le.config.Lock.RecordEvent("became leader")
glog.Infof("sucessfully acquired lease %v", desc)
close(stop)
}, le.config.RetryPeriod, JitterFactor, true, stop)
}
@ -224,12 +198,13 @@ func (le *LeaderElector) renew() {
return le.tryAcquireOrRenew(), nil
})
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
glog.V(4).Infof("succesfully renewed lease %v/%v", le.config.EndpointsMeta.Namespace, le.config.EndpointsMeta.Name)
glog.V(4).Infof("succesfully renewed lease %v", desc)
return
}
le.config.EventRecorder.Eventf(&api.Endpoints{ObjectMeta: le.config.EndpointsMeta}, api.EventTypeNormal, "%v stopped leading", le.config.Identity)
glog.Infof("failed to renew lease %v/%v", le.config.EndpointsMeta.Namespace, le.config.EndpointsMeta.Name)
le.config.Lock.RecordEvent("stopped leading")
glog.Infof("failed to renew lease %v", desc)
close(stop)
}, 0, stop)
}
@ -239,35 +214,22 @@ func (le *LeaderElector) renew() {
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew() bool {
now := unversioned.Now()
leaderElectionRecord := LeaderElectionRecord{
HolderIdentity: le.config.Identity,
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
e, err := le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Get(le.config.EndpointsMeta.Name)
// 1. obtain or create the ElectionRecord
oldLeaderElectionRecord, err := le.config.Lock.Get()
if err != nil {
if !errors.IsNotFound(err) {
glog.Errorf("error retrieving endpoint: %v", err)
glog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
leaderElectionRecordBytes, err := json.Marshal(leaderElectionRecord)
if err != nil {
return false
}
_, err = le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Create(&api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: le.config.EndpointsMeta.Name,
Namespace: le.config.EndpointsMeta.Namespace,
Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: string(leaderElectionRecordBytes),
},
},
})
if err != nil {
glog.Errorf("error initially creating endpoints: %v", err)
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
glog.Errorf("error initially creating leader election record: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
@ -275,46 +237,28 @@ func (le *LeaderElector) tryAcquireOrRenew() bool {
return true
}
if e.Annotations == nil {
e.Annotations = make(map[string]string)
// 2. Record obtained, check the Identity & Time
if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
le.observedRecord = *oldLeaderElectionRecord
le.observedTime = time.Now()
}
if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
oldLeaderElectionRecord.HolderIdentity != le.config.Lock.Identity() {
glog.Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
var oldLeaderElectionRecord LeaderElectionRecord
if oldLeaderElectionRecordBytes, found := e.Annotations[LeaderElectionRecordAnnotationKey]; found {
if err := json.Unmarshal([]byte(oldLeaderElectionRecordBytes), &oldLeaderElectionRecord); err != nil {
glog.Errorf("error unmarshaling leader election record: %v", err)
return false
}
if !reflect.DeepEqual(le.observedRecord, oldLeaderElectionRecord) {
le.observedRecord = oldLeaderElectionRecord
le.observedTime = time.Now()
}
if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
oldLeaderElectionRecord.HolderIdentity != le.config.Identity {
glog.Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
}
// We're going to try to update. The leaderElectionRecord is set to it's default
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if oldLeaderElectionRecord.HolderIdentity == le.config.Identity {
if oldLeaderElectionRecord.HolderIdentity == le.config.Lock.Identity() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
leaderElectionRecordBytes, err := json.Marshal(leaderElectionRecord)
if err != nil {
glog.Errorf("err marshaling leader election record: %v", err)
return false
}
e.Annotations[LeaderElectionRecordAnnotationKey] = string(leaderElectionRecordBytes)
_, err = le.config.Client.Core().Endpoints(le.config.EndpointsMeta.Namespace).Update(e)
if err != nil {
glog.Errorf("err: %v", err)
// update the lock itself
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
glog.Errorf("Failed to update lock: %v", err)
return false
}
le.observedRecord = leaderElectionRecord

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
fakeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
rl "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/runtime"
@ -40,7 +41,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
past := time.Now().Add(-1000 * time.Hour)
tests := []struct {
observedRecord LeaderElectionRecord
observedRecord rl.LeaderElectionRecord
observedTime time.Time
reactors []struct {
verb string
@ -116,7 +117,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
Namespace: action.GetNamespace(),
Name: action.(core.GetAction).GetName(),
Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`,
rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`,
},
},
}, nil
@ -129,7 +130,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
},
},
},
observedRecord: LeaderElectionRecord{HolderIdentity: "bing"},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
observedTime: past,
expectSuccess: true,
@ -150,7 +151,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
Namespace: action.GetNamespace(),
Name: action.(core.GetAction).GetName(),
Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`,
rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`,
},
},
}, nil
@ -176,7 +177,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
Namespace: action.GetNamespace(),
Name: action.(core.GetAction).GetName(),
Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: `{"holderIdentity":"baz"}`,
rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"baz"}`,
},
},
}, nil
@ -190,7 +191,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
},
},
observedTime: future,
observedRecord: LeaderElectionRecord{HolderIdentity: "baz"},
observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"},
expectSuccess: true,
outHolder: "baz",
@ -203,10 +204,16 @@ func TestTryAcquireOrRenew(t *testing.T) {
wg.Add(1)
var reportedLeader string
lec := LeaderElectionConfig{
lock := rl.EndpointsLock{
EndpointsMeta: api.ObjectMeta{Namespace: "foo", Name: "bar"},
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
LockConfig: rl.ResourceLockConfig{
Identity: "baz",
EventRecorder: &record.FakeRecorder{},
},
}
lec := LeaderElectionConfig{
Lock: &lock,
LeaseDuration: 10 * time.Second,
Callbacks: LeaderCallbacks{
OnNewLeader: func(l string) {
@ -229,7 +236,7 @@ func TestTryAcquireOrRenew(t *testing.T) {
observedRecord: test.observedRecord,
observedTime: test.observedTime,
}
le.config.Client = c
lock.Client = c
if test.expectSuccess != le.tryAcquireOrRenew() {
t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeded=%v]", i, !test.expectSuccess)

View File

@ -0,0 +1,102 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcelock
import (
"encoding/json"
"errors"
"fmt"
"k8s.io/kubernetes/pkg/api"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
)
type EndpointsLock struct {
// EndpointsMeta should contain a Name and a Namespace of an
// Endpoints object that the LeaderElector will attempt to lead.
EndpointsMeta api.ObjectMeta
Client clientset.Interface
LockConfig ResourceLockConfig
e *api.Endpoints
}
func (el *EndpointsLock) Get() (*LeaderElectionRecord, error) {
var record LeaderElectionRecord
var err error
el.e, err = el.Client.Core().Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name)
if err != nil {
return nil, err
}
if el.e.Annotations == nil {
el.e.Annotations = make(map[string]string)
}
if recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]; found {
if err := json.Unmarshal([]byte(recordBytes), &record); err != nil {
return nil, err
}
}
return &record, nil
}
// Create attempts to create a LeaderElectionRecord annotation
func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
el.e, err = el.Client.Core().Endpoints(el.EndpointsMeta.Namespace).Create(&api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: el.EndpointsMeta.Name,
Namespace: el.EndpointsMeta.Namespace,
Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: string(recordBytes),
},
},
})
return err
}
// Update will update and existing annotation on a given resource.
func (el *EndpointsLock) Update(ler LeaderElectionRecord) error {
if el.e == nil {
return errors.New("endpoint not initialized, call get or create first")
}
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes)
el.e, err = el.Client.Core().Endpoints(el.EndpointsMeta.Namespace).Update(el.e)
return err
}
// RecordEvent in leader election while adding meta-data
func (el *EndpointsLock) RecordEvent(s string) {
events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s)
el.LockConfig.EventRecorder.Eventf(&api.Endpoints{ObjectMeta: el.e.ObjectMeta}, api.EventTypeNormal, "LeaderElection", events)
}
// Describe is used to convert details on current resource lock
// into a string
func (el *EndpointsLock) Describe() string {
return fmt.Sprintf("%v/%v", el.EndpointsMeta.Namespace, el.EndpointsMeta.Name)
}
// returns the Identity of the lock
func (el *EndpointsLock) Identity() string {
return el.LockConfig.Identity
}

View File

@ -0,0 +1,71 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcelock
import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
)
const (
LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader"
)
// LeaderElectionRecord is the record that is stored in the leader election annotation.
// This information should be used for observational purposes only and could be replaced
// with a random string (e.g. UUID) with only slight modification of this code.
// TODO(mikedanese): this should potentially be versioned
type LeaderElectionRecord struct {
HolderIdentity string `json:"holderIdentity"`
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
AcquireTime unversioned.Time `json:"acquireTime"`
RenewTime unversioned.Time `json:"renewTime"`
LeaderTransitions int `json:"leaderTransitions"`
}
// ResourceLockConfig common data that exists across different
// resource locks
type ResourceLockConfig struct {
Identity string
EventRecorder record.EventRecorder
}
// Interface offers a common interface for locking on arbitrary
// resources used in leader election. The Interface is used
// to hide the details on specific implementations in order to allow
// them to change over time. This interface is strictly for use
// by the leaderelection code.
type Interface interface {
// Get returns the LeaderElectionRecord
Get() (*LeaderElectionRecord, error)
// Create attempts to create a LeaderElectionRecord
Create(ler LeaderElectionRecord) error
// Update will update and existing LeaderElectionRecord
Update(ler LeaderElectionRecord) error
// RecordEvent is used to record events
RecordEvent(string)
// Identity will return the locks Identity
Identity() string
// Describe is used to convert details on current resource lock
// into a string
Describe() string
}

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/api"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
@ -145,14 +146,21 @@ func Run(s *options.SchedulerServer) error {
return err
}
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
// TODO: enable other lock types
rl := resourcelock.EndpointsLock{
EndpointsMeta: api.ObjectMeta{
Namespace: "kube-system",
Name: "kube-scheduler",
},
Client: leaderElectionClient,
Identity: id,
EventRecorder: config.Recorder,
Client: leaderElectionClient,
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: config.Recorder,
},
}
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: &rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,