Skip to content

Commit

Permalink
Merge pull request #32664 from m1093782566/m109-certificates-hot-loop
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

[Controller Manager] Fix certificates controller hotloop and use utilruntime.HandleError to replace glog.Errorf

<!--  Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, read our contributor guidelines https://github.com/kubernetes/kubernetes/blob/master/CONTRIBUTING.md and developer guide https://github.com/kubernetes/kubernetes/blob/master/docs/devel/development.md
2. If you want *faster* PR reviews, read how: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/faster_reviews.md
3. Follow the instructions for writing a release note: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/pull-requests.md#release-notes
-->

**What this PR does / why we need it**:

Fix certificates controller hotloop on unexpected API server rejections.

**Which issue this PR fixes** 

Related issue is #30629

**Special notes for your reviewer**:

@deads2k @derekwaynecarr PTAL.

I find there is no unit test for certificates controller, and I will implement unit tests for it later.
  • Loading branch information
Kubernetes Submit Queue authored Sep 18, 2016
2 parents 41fc0a4 + fea0c79 commit 920581d
Showing 1 changed file with 27 additions and 18 deletions.
45 changes: 27 additions & 18 deletions pkg/controller/certificates/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type CertificateController struct {

signer *local.Signer

queue *workqueue.Type
queue workqueue.RateLimitingInterface
}

func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Duration, caCertFile, caKeyFile string, approveAllKubeletCSRsForGroup string) (*CertificateController, error) {
Expand All @@ -78,7 +78,7 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du

cc := &CertificateController{
kubeClient: kubeClient,
queue: workqueue.NewNamed("certificate"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "certificate"),
signer: ca,
approveAllKubeletCSRsForGroup: approveAllKubeletCSRsForGroup,
}
Expand Down Expand Up @@ -120,37 +120,47 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du
// Run the main goroutine responsible for watching and syncing jobs.
func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer cc.queue.ShutDown()

go cc.csrController.Run(stopCh)

glog.Infof("Starting certificate controller manager")
for i := 0; i < workers; i++ {
go wait.Until(cc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down certificate controller")
cc.queue.ShutDown()
}

// worker runs a thread that dequeues CSRs, handles them, and marks them done.
func (cc *CertificateController) worker() {
for {
func() {
key, quit := cc.queue.Get()
if quit {
return
}
defer cc.queue.Done(key)
err := cc.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing CSR: %v", err)
}
}()
for cc.processNextWorkItem() {
}
}

// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (cc *CertificateController) processNextWorkItem() bool {
cKey, quit := cc.queue.Get()
if quit {
return false
}
defer cc.queue.Done(cKey)

err := cc.syncHandler(cKey.(string))
if err == nil {
cc.queue.Forget(cKey)
return true
}

cc.queue.AddRateLimited(cKey)
utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err))
return true
}

func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
cc.queue.Add(key)
Expand Down Expand Up @@ -179,7 +189,6 @@ func (cc *CertificateController) maybeSignCertificate(key string) error {
}()
obj, exists, err := cc.csrStore.Store.GetByKey(key)
if err != nil {
cc.queue.Add(key)
return err
}
if !exists {
Expand Down Expand Up @@ -234,7 +243,7 @@ func (cc *CertificateController) maybeAutoApproveCSR(csr *certificates.Certifica

x509cr, err := utilcertificates.ParseCertificateRequestObject(csr)
if err != nil {
glog.Errorf("unable to parse csr %q: %v", csr.ObjectMeta.Name, err)
utilruntime.HandleError(fmt.Errorf("unable to parse csr %q: %v", csr.Name, err))
return csr, nil
}
if !reflect.DeepEqual([]string{"system:nodes"}, x509cr.Subject.Organization) {
Expand Down

0 comments on commit 920581d

Please sign in to comment.