From e63fcf708d104bf6c69033ca0e2321bd826c2b40 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Wed, 12 Apr 2017 15:49:17 -0400 Subject: [PATCH] Make controller Run methods consistent - startup/shutdown logging - wait for cache sync logging - defer utilruntime.HandleCrash() - wait for stop channel before exiting --- .../app/controllermanager.go | 2 +- hack/.linted_packages | 1 + .../certificates/certificate_controller.go | 8 +-- pkg/controller/controller_utils.go | 16 ++++++ pkg/controller/daemon/daemoncontroller.go | 7 ++- .../deployment/deployment_controller.go | 5 +- pkg/controller/disruption/disruption.go | 11 ++-- .../endpoint/endpoints_controller.go | 8 ++- .../garbagecollector/garbagecollector.go | 16 ++++-- pkg/controller/job/jobcontroller.go | 7 +-- .../namespace/namespace_controller.go | 8 ++- pkg/controller/node/nodecontroller.go | 54 ++++++++++--------- pkg/controller/podautoscaler/horizontal.go | 7 ++- pkg/controller/podgc/gc_controller.go | 11 ++-- pkg/controller/replicaset/replica_set.go | 7 ++- .../replication/replication_controller.go | 7 ++- .../resource_quota_controller.go | 8 +-- pkg/controller/route/routecontroller.go | 9 ++-- pkg/controller/service/servicecontroller.go | 5 +- .../serviceaccounts_controller.go | 8 +-- pkg/controller/statefulset/stateful_set.go | 7 ++- pkg/controller/ttl/ttlcontroller.go | 3 +- .../attachdetach/attach_detach_controller.go | 8 +-- .../persistentvolume/pv_controller_base.go | 16 +++--- .../thirdparty/tprregistration_controller.go | 3 +- pkg/proxy/config/config.go | 20 +++++-- .../pkg/apiserver/apiservice_controller.go | 6 +-- .../autoregister/autoregister_controller.go | 3 +- .../kube-aggregator/pkg/controllers/cache.go | 41 ++++++++++++++ 29 files changed, 200 insertions(+), 112 deletions(-) create mode 100644 staging/src/k8s.io/kube-aggregator/pkg/controllers/cache.go diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 34e0575ec1cdd..5bfe8620dfd98 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -484,7 +484,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root if err != nil { return fmt.Errorf("failed to initialize nodecontroller: %v", err) } - nodeController.Run() + go nodeController.Run(stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } else { glog.Warningf("%q is disabled", nodeControllerName) diff --git a/hack/.linted_packages b/hack/.linted_packages index 3bd1d3fb267de..bd85c8e10f840 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -393,6 +393,7 @@ staging/src/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregis staging/src/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion staging/src/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion staging/src/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1alpha1 +staging/src/k8s.io/kube-aggregator/pkg/controllers staging/src/k8s.io/sample-apiserver staging/src/k8s.io/sample-apiserver/pkg/apis/wardle/install test/e2e/perftype diff --git a/pkg/controller/certificates/certificate_controller.go b/pkg/controller/certificates/certificate_controller.go index 04d406ab38e48..a6c81f86286ba 100644 --- a/pkg/controller/certificates/certificate_controller.go +++ b/pkg/controller/certificates/certificate_controller.go @@ -117,18 +117,18 @@ func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer cc.queue.ShutDown() - glog.Infof("Starting certificate controller manager") + glog.Infof("Starting certificate controller") + defer glog.Infof("Shutting down certificate controller") - if !cache.WaitForCacheSync(stopCh, cc.csrsSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controller.WaitForCacheSync("certificate", stopCh, cc.csrsSynced) { return } for i := 0; i < workers; i++ { go wait.Until(cc.worker, time.Second, stopCh) } + <-stopCh - glog.Infof("Shutting down certificate controller") } // worker runs a thread that dequeues CSRs, handles them, and marks them done. diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 3316cd0f20f5c..9b50586160639 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" @@ -961,3 +962,18 @@ func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, n _, err = c.Core().Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes) return err } + +// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages +// indicating that the controller identified by controllerName is waiting for syncs, followed by +// either a successful or failed sync. +func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool { + glog.Infof("Waiting for caches to sync for %s controller", controllerName) + + if !cache.WaitForCacheSync(stopCh, cacheSyncs...) { + utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName)) + return false + } + + glog.Infof("Caches are synced for %s controller", controllerName) + return true +} diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 1e597f6c55140..6a4785e4ac9ee 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -196,10 +196,10 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer dsc.queue.ShutDown() - glog.Infof("Starting Daemon Sets controller manager") + glog.Infof("Starting daemon sets controller") + defer glog.Infof("Shutting down daemon sets controller") - if !cache.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.dsStoreSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controller.WaitForCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.dsStoreSynced) { return } @@ -208,7 +208,6 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { } <-stopCh - glog.Infof("Shutting down Daemon Set Controller") } func (dsc *DaemonSetsController) runWorker() { diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 4cd66c74a4275..0064b286fe924 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -150,9 +150,9 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { defer dc.queue.ShutDown() glog.Infof("Starting deployment controller") + defer glog.Infof("Shutting down deployment controller") - if !cache.WaitForCacheSync(stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { return } @@ -161,7 +161,6 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { } <-stopCh - glog.Infof("Shutting down deployment controller") } func (dc *DeploymentController) addDeployment(obj interface{}) { diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 1277645fd0c13..3cebdf0223ed7 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -266,24 +266,23 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer dc.queue.ShutDown() - glog.V(0).Infof("Starting disruption controller") + glog.Infof("Starting disruption controller") + defer glog.Infof("Shutting down disruption controller") - if !cache.WaitForCacheSync(stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controller.WaitForCacheSync("disruption", stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) { return } if dc.kubeClient != nil { - glog.V(0).Infof("Sending events to api server.") + glog.Infof("Sending events to api server.") dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(dc.kubeClient.Core().RESTClient()).Events("")}) } else { - glog.V(0).Infof("No api server defined - no events will be sent to API server.") + glog.Infof("No api server defined - no events will be sent to API server.") } go wait.Until(dc.worker, time.Second, stopCh) go wait.Until(dc.recheckWorker, time.Second, stopCh) <-stopCh - glog.V(0).Infof("Shutting down disruption controller") } func (dc *DisruptionController) addDb(obj interface{}) { diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 86fb09762a676..1f243565ded0a 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" "github.com/golang/glog" @@ -131,14 +132,17 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer e.queue.ShutDown() - if !cache.WaitForCacheSync(stopCh, e.podsSynced, e.servicesSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + glog.Infof("Starting endpoint controller") + defer glog.Infof("Shutting down endpoint controller") + + if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced) { return } for i := 0; i < workers; i++ { go wait.Until(e.worker, time.Second, stopCh) } + go func() { defer utilruntime.HandleCrash() time.Sleep(5 * time.Minute) // give time for our cache to fill diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 2b675d47749ba..6e80dba753bd2 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -32,8 +32,8 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" // install the prometheus plugin _ "k8s.io/kubernetes/pkg/util/workqueue/prometheus" @@ -104,25 +104,31 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam } func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() defer gc.attemptToDelete.ShutDown() defer gc.attemptToOrphan.ShutDown() defer gc.dependencyGraphBuilder.graphChanges.ShutDown() - glog.Infof("Garbage Collector: Initializing") + glog.Infof("Starting garbage collector controller") + defer glog.Infof("Shutting down garbage collector controller") + gc.dependencyGraphBuilder.Run(stopCh) - if !cache.WaitForCacheSync(stopCh, gc.dependencyGraphBuilder.HasSynced) { + + if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.HasSynced) { return } - glog.Infof("Garbage Collector: All resource monitors have synced. Proceeding to collect garbage") + + glog.Infof("Garbage collector: all resource monitors have synced. Proceeding to collect garbage") // gc workers for i := 0; i < workers; i++ { go wait.Until(gc.runAttemptToDeleteWorker, 1*time.Second, stopCh) go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, stopCh) } + Register() + <-stopCh - glog.Infof("Garbage Collector: Shutting down") } func (gc *GarbageCollector) HasSynced() bool { diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index ed3eb7fa90a4f..ade0e584088ef 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -126,8 +126,10 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer jm.queue.ShutDown() - if !cache.WaitForCacheSync(stopCh, jm.podStoreSynced, jm.jobStoreSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + glog.Infof("Starting job controller") + defer glog.Infof("Shutting down job controller") + + if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) { return } @@ -136,7 +138,6 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { } <-stopCh - glog.Infof("Shutting down Job Manager") } // getPodJob returns the job managing the given pod. diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index 734ddb2fd2df4..df1d535dc4023 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -186,10 +186,10 @@ func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer nm.queue.ShutDown() - glog.Info("Starting the NamespaceController") + glog.Info("Starting namespace controller") + defer glog.Infof("Shutting down namespace controller") - if !cache.WaitForCacheSync(stopCh, nm.listerSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controller.WaitForCacheSync("namespace", stopCh, nm.listerSynced) { return } @@ -198,6 +198,4 @@ func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) { } <-stopCh - - glog.Info("Shutting down NamespaceController") } diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index b3b4f713ff9e6..938e19750574e 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -518,37 +518,39 @@ func (nc *NodeController) onNodeDelete(originalObj interface{}) { } // Run starts an asynchronous loop that monitors the status of cluster nodes. -func (nc *NodeController) Run() { - go func() { - defer utilruntime.HandleCrash() +func (nc *NodeController) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() - if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) - return - } + glog.Infof("Starting node controller") + defer glog.Infof("Shutting down node controller") - // Incorporate the results of node status pushed from kubelet to master. - go wait.Until(func() { - if err := nc.monitorNodeStatus(); err != nil { - glog.Errorf("Error monitoring node status: %v", err) - } - }, nc.nodeMonitorPeriod, wait.NeverStop) + if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) { + return + } - if nc.runTaintManager { - go nc.taintManager.Run(wait.NeverStop) + // Incorporate the results of node status pushed from kubelet to master. + go wait.Until(func() { + if err := nc.monitorNodeStatus(); err != nil { + glog.Errorf("Error monitoring node status: %v", err) } + }, nc.nodeMonitorPeriod, wait.NeverStop) - if nc.useTaintBasedEvictions { - // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated - // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. - go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop) - } else { - // Managing eviction of nodes: - // When we delete pods off a node, if the node was not empty at the time we then - // queue an eviction watcher. If we hit an error, retry deletion. - go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop) - } - }() + if nc.runTaintManager { + go nc.taintManager.Run(wait.NeverStop) + } + + if nc.useTaintBasedEvictions { + // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated + // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. + go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop) + } else { + // Managing eviction of nodes: + // When we delete pods off a node, if the node was not empty at the time we then + // queue an eviction watcher. If we hit an error, retry deletion. + go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop) + } + + <-stopCh } // monitorNodeStatus verifies node status are constantly updated by kubelet, and if not, diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index 9fe2ba469e55d..14ac9d0d4035d 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -136,10 +136,10 @@ func (a *HorizontalController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer a.queue.ShutDown() - glog.Infof("Starting HPA Controller") + glog.Infof("Starting HPA controller") + defer glog.Infof("Shutting down HPA controller") - if !cache.WaitForCacheSync(stopCh, a.hpaListerSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controller.WaitForCacheSync("HPA", stopCh, a.hpaListerSynced) { return } @@ -147,7 +147,6 @@ func (a *HorizontalController) Run(stopCh <-chan struct{}) { go wait.Until(a.worker, time.Second, stopCh) <-stopCh - glog.Infof("Shutting down HPA Controller") } // obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item. diff --git a/pkg/controller/podgc/gc_controller.go b/pkg/controller/podgc/gc_controller.go index 381e4fbcaf22f..e64d617ab6993 100644 --- a/pkg/controller/podgc/gc_controller.go +++ b/pkg/controller/podgc/gc_controller.go @@ -17,7 +17,6 @@ limitations under the License. package podgc import ( - "fmt" "sort" "sync" "time" @@ -32,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" "github.com/golang/glog" @@ -71,12 +71,17 @@ func NewPodGC(kubeClient clientset.Interface, podInformer coreinformers.PodInfor } func (gcc *PodGCController) Run(stop <-chan struct{}) { - if !cache.WaitForCacheSync(stop, gcc.podListerSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + defer utilruntime.HandleCrash() + + glog.Infof("Starting GC controller") + defer glog.Infof("Shutting down GC controller") + + if !controller.WaitForCacheSync("GC", stop, gcc.podListerSynced) { return } go wait.Until(gcc.gc, gcCheckPeriod, stop) + <-stop } diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 256b3ad81ca23..7dc02c2371411 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -152,10 +152,10 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer rsc.queue.ShutDown() - glog.Infof("Starting ReplicaSet controller") + glog.Infof("Starting replica set controller") + defer glog.Infof("Shutting down replica set Controller") - if !cache.WaitForCacheSync(stopCh, rsc.podListerSynced, rsc.rsListerSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controller.WaitForCacheSync("replica set", stopCh, rsc.podListerSynced, rsc.rsListerSynced) { return } @@ -164,7 +164,6 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { } <-stopCh - glog.Infof("Shutting down ReplicaSet Controller") } // getPodReplicaSets returns a list of ReplicaSets matching the given pod. diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 6c4b89a427bc0..eebf799547c4e 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -147,10 +147,10 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer rm.queue.ShutDown() - glog.Infof("Starting RC Manager") + glog.Infof("Starting RC controller") + defer glog.Infof("Shutting down RC controller") - if !cache.WaitForCacheSync(stopCh, rm.podListerSynced, rm.rcListerSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controller.WaitForCacheSync("RC", stopCh, rm.podListerSynced, rm.rcListerSynced) { return } @@ -159,7 +159,6 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { } <-stopCh - glog.Infof("Shutting down RC Manager") } // getPodControllers returns a list of ReplicationControllers matching the given pod. diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index f3fd8668094ea..a1e704270a89b 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -236,16 +236,17 @@ func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) // Run begins quota controller using the specified number of workers func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + defer rq.queue.ShutDown() glog.Infof("Starting resource quota controller") + defer glog.Infof("Shutting down resource quota controller") // the controllers that replenish other resources to respond rapidly to state changes for _, replenishmentController := range rq.replenishmentControllers { go replenishmentController.Run(stopCh) } - if !cache.WaitForCacheSync(stopCh, rq.informerSyncedFuncs...) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controller.WaitForCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) { return } @@ -254,9 +255,8 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) { go wait.Until(rq.worker(rq.queue), time.Second, stopCh) go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh) } + <-stopCh - glog.Infof("Shutting down ResourceQuotaController") - rq.queue.ShutDown() } // syncResourceQuotaFromKey syncs a quota key diff --git a/pkg/controller/route/routecontroller.go b/pkg/controller/route/routecontroller.go index 206eb291327b7..57fbdf570c7e6 100644 --- a/pkg/controller/route/routecontroller.go +++ b/pkg/controller/route/routecontroller.go @@ -35,6 +35,7 @@ import ( coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" nodeutil "k8s.io/kubernetes/pkg/util/node" ) @@ -77,10 +78,10 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform func (rc *RouteController) Run(stopCh <-chan struct{}, syncPeriod time.Duration) { defer utilruntime.HandleCrash() - glog.Info("Starting the route controller") + glog.Info("Starting route controller") + defer glog.Info("Shutting down route controller") - if !cache.WaitForCacheSync(stopCh, rc.nodeListerSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controller.WaitForCacheSync("route", stopCh, rc.nodeListerSynced) { return } @@ -94,6 +95,8 @@ func (rc *RouteController) Run(stopCh <-chan struct{}, syncPeriod time.Duration) glog.Errorf("Couldn't reconcile node routes: %v", err) } }, syncPeriod, wait.NeverStop) + + <-stopCh } func (rc *RouteController) reconcileNodeRoutes() error { diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 6a9066b257cce..c3c5b85efabfb 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -174,9 +174,9 @@ func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) { defer s.workingQueue.ShutDown() glog.Info("Starting service controller") + defer glog.Info("Shutting down service controller") - if !cache.WaitForCacheSync(stopCh, s.serviceListerSynced, s.nodeListerSynced) { - runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controller.WaitForCacheSync("service", stopCh, s.serviceListerSynced, s.nodeListerSynced) { } for i := 0; i < workers; i++ { @@ -186,7 +186,6 @@ func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) { go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh) <-stopCh - glog.Info("Stopping service controller") } // worker runs a worker thread that just dequeues items, processes them, and marks them done. diff --git a/pkg/controller/serviceaccount/serviceaccounts_controller.go b/pkg/controller/serviceaccount/serviceaccounts_controller.go index 9ffaab487c9e6..c6a2260d08aea 100644 --- a/pkg/controller/serviceaccount/serviceaccounts_controller.go +++ b/pkg/controller/serviceaccount/serviceaccounts_controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" ) @@ -109,10 +110,10 @@ func (c *ServiceAccountsController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() - glog.Infof("Starting ServiceAccount controller") + glog.Infof("Starting service account controller") + defer glog.Infof("Shutting down service account controller") - if !cache.WaitForCacheSync(stopCh, c.saListerSynced, c.nsListerSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controller.WaitForCacheSync("service account", stopCh, c.saListerSynced, c.nsListerSynced) { return } @@ -121,7 +122,6 @@ func (c *ServiceAccountsController) Run(workers int, stopCh <-chan struct{}) { } <-stopCh - glog.Infof("Shutting down ServiceAccount controller") } // serviceAccountDeleted reacts to a ServiceAccount deletion by recreating a default ServiceAccount in the namespace if needed diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 67c189ee1cd51..aad505126ace5 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -141,10 +141,10 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer ssc.queue.ShutDown() - glog.Infof("Starting statefulset controller") + glog.Infof("Starting stateful set controller") + defer glog.Infof("Shutting down statefulset controller") - if !cache.WaitForCacheSync(stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controller.WaitForCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced) { return } @@ -153,7 +153,6 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) { } <-stopCh - glog.Infof("Shutting down statefulset controller") } // addPod adds the statefulset for the pod to the sync queue diff --git a/pkg/controller/ttl/ttlcontroller.go b/pkg/controller/ttl/ttlcontroller.go index 8ed9c9daeb595..b5293677d7b05 100644 --- a/pkg/controller/ttl/ttlcontroller.go +++ b/pkg/controller/ttl/ttlcontroller.go @@ -116,7 +116,8 @@ func (ttlc *TTLController) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Starting TTL controller") defer glog.Infof("Shutting down TTL controller") - if !cache.WaitForCacheSync(stopCh, ttlc.hasSynced) { + + if !controller.WaitForCacheSync("TTL", stopCh, ttlc.hasSynced) { return } diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 5266019ad7825..dbb47c3ad5cd5 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -220,13 +220,14 @@ type attachDetachController struct { func (adc *attachDetachController) Run(stopCh <-chan struct{}) { defer runtime.HandleCrash() - glog.Infof("Starting Attach Detach Controller") + + glog.Infof("Starting attach detach controller") + defer glog.Infof("Shutting down attach detach controller") // TODO uncomment once we agree this is ok and we fix the attach/detach integration test that // currently fails because it doesn't set pvcsSynced and pvsSynced to alwaysReady, so this // controller never runs. - // if !kcache.WaitForCacheSync(stopCh, adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced) { - // runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + // if !controller.WaitForCacheSync("attach detach", stopCh, adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced) { // return // } @@ -234,7 +235,6 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) { go adc.desiredStateOfWorldPopulator.Run(stopCh) <-stopCh - glog.Infof("Shutting down Attach Detach Controller") } func (adc *attachDetachController) podAdd(obj interface{}) { diff --git a/pkg/controller/volume/persistentvolume/pv_controller_base.go b/pkg/controller/volume/persistentvolume/pv_controller_base.go index 0296798deb76f..e4454e1ff9287 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_base.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_base.go @@ -273,19 +273,23 @@ func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeCl // Run starts all of this controller's control loops func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) { - glog.V(1).Infof("starting PersistentVolumeController") - if !cache.WaitForCacheSync(stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for volume caches to sync")) + defer utilruntime.HandleCrash() + defer ctrl.claimQueue.ShutDown() + defer ctrl.volumeQueue.ShutDown() + + glog.Infof("Starting persistent volume controller") + defer glog.Infof("Shutting down peristent volume controller") + + if !controller.WaitForCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced) { return } + ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister) + go wait.Until(ctrl.volumeWorker, time.Second, stopCh) go wait.Until(ctrl.claimWorker, time.Second, stopCh) <-stopCh - - ctrl.claimQueue.ShutDown() - ctrl.volumeQueue.ShutDown() } // volumeWorker processes items from volumeQueue. It must run only once, diff --git a/pkg/master/thirdparty/tprregistration_controller.go b/pkg/master/thirdparty/tprregistration_controller.go index f607bcbb0290c..84ea6920990e6 100644 --- a/pkg/master/thirdparty/tprregistration_controller.go +++ b/pkg/master/thirdparty/tprregistration_controller.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/extensions/internalversion" listers "k8s.io/kubernetes/pkg/client/listers/extensions/internalversion" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata" ) @@ -108,7 +109,7 @@ func (c *tprRegistrationController) Run(threadiness int, stopCh <-chan struct{}) defer glog.Infof("Shutting down tpr-autoregister controller") // wait for your secondary caches to fill before starting your work - if !cache.WaitForCacheSync(stopCh, c.tprSynced) { + if !controller.WaitForCacheSync("tpr-autoregister", stopCh, c.tprSynced) { return } diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 979733af99322..89cc3c1b63997 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/api" coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion" listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/config" ) @@ -96,8 +97,12 @@ func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) { // Run starts the goroutine responsible for calling registered handlers. func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { - if !cache.WaitForCacheSync(stopCh, c.listerSynced) { - utilruntime.HandleError(fmt.Errorf("endpoint controller not synced")) + defer utilruntime.HandleCrash() + + glog.Info("Starting endpoints config controller") + defer glog.Info("Shutting down endpoints config controller") + + if !controller.WaitForCacheSync("endpoints config", stopCh, c.listerSynced) { return } @@ -201,14 +206,19 @@ func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { // Run starts the goroutine responsible for calling // registered handlers. func (c *ServiceConfig) Run(stopCh <-chan struct{}) { - if !cache.WaitForCacheSync(stopCh, c.listerSynced) { - utilruntime.HandleError(fmt.Errorf("service controller not synced")) + defer utilruntime.HandleCrash() + + glog.Info("Starting service config controller") + defer glog.Info("Shutting down service config controller") + + if !controller.WaitForCacheSync("service config", stopCh, c.listerSynced) { return } // We have synced informers. Now we can start delivering updates // to the registered handler. go func() { + defer utilruntime.HandleCrash() for { select { case <-c.updates: @@ -236,6 +246,8 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) { <-stopCh close(c.stop) }() + + <-stopCh } func (c *ServiceConfig) handleAddService(_ interface{}) { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go index b826a01178ff6..d593276cebf71 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/kube-aggregator/pkg/apis/apiregistration" informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion" listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion" + "k8s.io/kube-aggregator/pkg/controllers" ) type APIHandlerManager interface { @@ -124,12 +125,11 @@ func (c *APIServiceRegistrationController) getDestinationHost(apiService *apireg func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() - defer glog.Infof("Shutting down APIServiceRegistrationController") glog.Infof("Starting APIServiceRegistrationController") + defer glog.Infof("Shutting down APIServiceRegistrationController") - if !cache.WaitForCacheSync(stopCh, c.apiServiceSynced, c.servicesSynced) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + if !controllers.WaitForCacheSync("APIServiceRegistrationController", stopCh, c.apiServiceSynced, c.servicesSynced) { return } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go index ec14692fdd850..f606d51c24523 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go @@ -35,6 +35,7 @@ import ( apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion" informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion" listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion" + "k8s.io/kube-aggregator/pkg/controllers" ) const ( @@ -120,7 +121,7 @@ func (c *autoRegisterController) Run(threadiness int, stopCh <-chan struct{}) { defer glog.Infof("Shutting down autoregister controller") // wait for your secondary caches to fill before starting your work - if !cache.WaitForCacheSync(stopCh, c.apiServiceSynced) { + if !controllers.WaitForCacheSync("autoregister", stopCh, c.apiServiceSynced) { return } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/cache.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/cache.go new file mode 100644 index 0000000000000..bd366ec5b871b --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/cache.go @@ -0,0 +1,41 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "fmt" + + "github.com/golang/glog" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" +) + +// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages +// indicating that the controller identified by controllerName is waiting for syncs, followed by +// either a successful or failed sync. +func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool { + glog.Infof("Waiting for caches to sync for %s controller", controllerName) + + if !cache.WaitForCacheSync(stopCh, cacheSyncs...) { + utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName)) + return false + } + + glog.Infof("Caches are synced for %s controller", controllerName) + return true +}