Skip to content

Commit

Permalink
Make controller Run methods consistent
Browse files Browse the repository at this point in the history
- startup/shutdown logging
- wait for cache sync logging
- defer utilruntime.HandleCrash()
- wait for stop channel before exiting
  • Loading branch information
ncdc committed Apr 14, 2017
1 parent 03e555f commit e63fcf7
Show file tree
Hide file tree
Showing 29 changed files with 200 additions and 112 deletions.
2 changes: 1 addition & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
if err != nil {
return fmt.Errorf("failed to initialize nodecontroller: %v", err)
}
nodeController.Run()
go nodeController.Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} else {
glog.Warningf("%q is disabled", nodeControllerName)
Expand Down
1 change: 1 addition & 0 deletions hack/.linted_packages
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ staging/src/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregis
staging/src/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion
staging/src/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion
staging/src/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1alpha1
staging/src/k8s.io/kube-aggregator/pkg/controllers
staging/src/k8s.io/sample-apiserver
staging/src/k8s.io/sample-apiserver/pkg/apis/wardle/install
test/e2e/perftype
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/certificates/certificate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,18 @@ func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer cc.queue.ShutDown()

glog.Infof("Starting certificate controller manager")
glog.Infof("Starting certificate controller")
defer glog.Infof("Shutting down certificate controller")

if !cache.WaitForCacheSync(stopCh, cc.csrsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("certificate", stopCh, cc.csrsSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(cc.worker, time.Second, stopCh)
}

<-stopCh
glog.Infof("Shutting down certificate controller")
}

// worker runs a thread that dequeues CSRs, handles them, and marks them done.
Expand Down
16 changes: 16 additions & 0 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -961,3 +962,18 @@ func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, n
_, err = c.Core().Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes)
return err
}

// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
// indicating that the controller identified by controllerName is waiting for syncs, followed by
// either a successful or failed sync.
func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
glog.Infof("Waiting for caches to sync for %s controller", controllerName)

if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName))
return false
}

glog.Infof("Caches are synced for %s controller", controllerName)
return true
}
7 changes: 3 additions & 4 deletions pkg/controller/daemon/daemoncontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,10 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dsc.queue.ShutDown()

glog.Infof("Starting Daemon Sets controller manager")
glog.Infof("Starting daemon sets controller")
defer glog.Infof("Shutting down daemon sets controller")

if !cache.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.dsStoreSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.dsStoreSynced) {
return
}

Expand All @@ -208,7 +208,6 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
}

<-stopCh
glog.Infof("Shutting down Daemon Set Controller")
}

func (dsc *DaemonSetsController) runWorker() {
Expand Down
5 changes: 2 additions & 3 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer dc.queue.ShutDown()

glog.Infof("Starting deployment controller")
defer glog.Infof("Shutting down deployment controller")

if !cache.WaitForCacheSync(stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}

Expand All @@ -161,7 +161,6 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
}

<-stopCh
glog.Infof("Shutting down deployment controller")
}

func (dc *DeploymentController) addDeployment(obj interface{}) {
Expand Down
11 changes: 5 additions & 6 deletions pkg/controller/disruption/disruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,24 +266,23 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()

glog.V(0).Infof("Starting disruption controller")
glog.Infof("Starting disruption controller")
defer glog.Infof("Shutting down disruption controller")

if !cache.WaitForCacheSync(stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("disruption", stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
return
}

if dc.kubeClient != nil {
glog.V(0).Infof("Sending events to api server.")
glog.Infof("Sending events to api server.")
dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(dc.kubeClient.Core().RESTClient()).Events("")})
} else {
glog.V(0).Infof("No api server defined - no events will be sent to API server.")
glog.Infof("No api server defined - no events will be sent to API server.")
}
go wait.Until(dc.worker, time.Second, stopCh)
go wait.Until(dc.recheckWorker, time.Second, stopCh)

<-stopCh
glog.V(0).Infof("Shutting down disruption controller")
}

func (dc *DisruptionController) addDb(obj interface{}) {
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/endpoint/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"

"github.com/golang/glog"
Expand Down Expand Up @@ -131,14 +132,17 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer e.queue.ShutDown()

if !cache.WaitForCacheSync(stopCh, e.podsSynced, e.servicesSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
glog.Infof("Starting endpoint controller")
defer glog.Infof("Shutting down endpoint controller")

if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(e.worker, time.Second, stopCh)
}

go func() {
defer utilruntime.HandleCrash()
time.Sleep(5 * time.Minute) // give time for our cache to fill
Expand Down
16 changes: 11 additions & 5 deletions pkg/controller/garbagecollector/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
// install the prometheus plugin
_ "k8s.io/kubernetes/pkg/util/workqueue/prometheus"
Expand Down Expand Up @@ -104,25 +104,31 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
}

func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer gc.attemptToDelete.ShutDown()
defer gc.attemptToOrphan.ShutDown()
defer gc.dependencyGraphBuilder.graphChanges.ShutDown()

glog.Infof("Garbage Collector: Initializing")
glog.Infof("Starting garbage collector controller")
defer glog.Infof("Shutting down garbage collector controller")

gc.dependencyGraphBuilder.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, gc.dependencyGraphBuilder.HasSynced) {

if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.HasSynced) {
return
}
glog.Infof("Garbage Collector: All resource monitors have synced. Proceeding to collect garbage")

glog.Infof("Garbage collector: all resource monitors have synced. Proceeding to collect garbage")

// gc workers
for i := 0; i < workers; i++ {
go wait.Until(gc.runAttemptToDeleteWorker, 1*time.Second, stopCh)
go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, stopCh)
}

Register()

<-stopCh
glog.Infof("Garbage Collector: Shutting down")
}

func (gc *GarbageCollector) HasSynced() bool {
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/job/jobcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer jm.queue.ShutDown()

if !cache.WaitForCacheSync(stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
glog.Infof("Starting job controller")
defer glog.Infof("Shutting down job controller")

if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
return
}

Expand All @@ -136,7 +138,6 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
}

<-stopCh
glog.Infof("Shutting down Job Manager")
}

// getPodJob returns the job managing the given pod.
Expand Down
8 changes: 3 additions & 5 deletions pkg/controller/namespace/namespace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer nm.queue.ShutDown()

glog.Info("Starting the NamespaceController")
glog.Info("Starting namespace controller")
defer glog.Infof("Shutting down namespace controller")

if !cache.WaitForCacheSync(stopCh, nm.listerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("namespace", stopCh, nm.listerSynced) {
return
}

Expand All @@ -198,6 +198,4 @@ func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
}

<-stopCh

glog.Info("Shutting down NamespaceController")
}
54 changes: 28 additions & 26 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,37 +518,39 @@ func (nc *NodeController) onNodeDelete(originalObj interface{}) {
}

// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run() {
go func() {
defer utilruntime.HandleCrash()
func (nc *NodeController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
glog.Infof("Starting node controller")
defer glog.Infof("Shutting down node controller")

// Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod, wait.NeverStop)
if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}

if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop)
// Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod, wait.NeverStop)

if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
}
}()
if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop)
}

if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
}

<-stopCh
}

// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
Expand Down
7 changes: 3 additions & 4 deletions pkg/controller/podautoscaler/horizontal.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,17 @@ func (a *HorizontalController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer a.queue.ShutDown()

glog.Infof("Starting HPA Controller")
glog.Infof("Starting HPA controller")
defer glog.Infof("Shutting down HPA controller")

if !cache.WaitForCacheSync(stopCh, a.hpaListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
if !controller.WaitForCacheSync("HPA", stopCh, a.hpaListerSynced) {
return
}

// start a single worker (we may wish to start more in the future)
go wait.Until(a.worker, time.Second, stopCh)

<-stopCh
glog.Infof("Shutting down HPA Controller")
}

// obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item.
Expand Down
11 changes: 8 additions & 3 deletions pkg/controller/podgc/gc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package podgc

import (
"fmt"
"sort"
"sync"
"time"
Expand All @@ -32,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"

"github.com/golang/glog"
Expand Down Expand Up @@ -71,12 +71,17 @@ func NewPodGC(kubeClient clientset.Interface, podInformer coreinformers.PodInfor
}

func (gcc *PodGCController) Run(stop <-chan struct{}) {
if !cache.WaitForCacheSync(stop, gcc.podListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
defer utilruntime.HandleCrash()

glog.Infof("Starting GC controller")
defer glog.Infof("Shutting down GC controller")

if !controller.WaitForCacheSync("GC", stop, gcc.podListerSynced) {
return
}

go wait.Until(gcc.gc, gcCheckPeriod, stop)

<-stop
}

Expand Down
Loading

0 comments on commit e63fcf7

Please sign in to comment.