fix certificates controller hotloop on unexpected API server rejections

Change-Id: Ib7d2e18bcaa498bddfc785f3ff12958dfaaecbc3
pull/6/head
d00369826 2016-09-14 20:19:19 +08:00
parent 6b1565d275
commit fea0c79054
1 changed files with 27 additions and 18 deletions

View File

@ -58,7 +58,7 @@ type CertificateController struct {
signer *local.Signer
queue *workqueue.Type
queue workqueue.RateLimitingInterface
}
func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Duration, caCertFile, caKeyFile string, approveAllKubeletCSRsForGroup string) (*CertificateController, error) {
@ -79,7 +79,7 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du
cc := &CertificateController{
kubeClient: kubeClient,
queue: workqueue.NewNamed("certificate"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "certificate"),
signer: ca,
approveAllKubeletCSRsForGroup: approveAllKubeletCSRsForGroup,
}
@ -121,37 +121,47 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du
// Run the main goroutine responsible for watching and syncing jobs.
func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer cc.queue.ShutDown()
go cc.csrController.Run(stopCh)
glog.Infof("Starting certificate controller manager")
for i := 0; i < workers; i++ {
go wait.Until(cc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down certificate controller")
cc.queue.ShutDown()
}
// worker runs a thread that dequeues CSRs, handles them, and marks them done.
func (cc *CertificateController) worker() {
for {
func() {
key, quit := cc.queue.Get()
if quit {
return
}
defer cc.queue.Done(key)
err := cc.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing CSR: %v", err)
}
}()
for cc.processNextWorkItem() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (cc *CertificateController) processNextWorkItem() bool {
cKey, quit := cc.queue.Get()
if quit {
return false
}
defer cc.queue.Done(cKey)
err := cc.syncHandler(cKey.(string))
if err == nil {
cc.queue.Forget(cKey)
return true
}
cc.queue.AddRateLimited(cKey)
utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err))
return true
}
func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
cc.queue.Add(key)
@ -180,7 +190,6 @@ func (cc *CertificateController) maybeSignCertificate(key string) error {
}()
obj, exists, err := cc.csrStore.Store.GetByKey(key)
if err != nil {
cc.queue.Add(key)
return err
}
if !exists {
@ -235,7 +244,7 @@ func (cc *CertificateController) maybeAutoApproveCSR(csr *certificates.Certifica
x509cr, err := utilcertificates.ParseCertificateRequestObject(csr)
if err != nil {
glog.Errorf("unable to parse csr %q: %v", csr.ObjectMeta.Name, err)
utilruntime.HandleError(fmt.Errorf("unable to parse csr %q: %v", csr.Name, err))
return csr, nil
}
if !reflect.DeepEqual([]string{"system:nodes"}, x509cr.Subject.Organization) {