Skip to content

Commit

Permalink
Rewritte util.* -> wait.* wherever reasonable
Browse files Browse the repository at this point in the history
  • Loading branch information
ingvagabund committed Feb 7, 2016
1 parent 43a47a8 commit 4389b3f
Show file tree
Hide file tree
Showing 71 changed files with 231 additions and 225 deletions.
6 changes: 3 additions & 3 deletions cluster/addons/dns/kube2sky/kube2sky.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func watchForServices(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
UpdateFunc: ks.updateService,
},
)
go serviceController.Run(util.NeverStop)
go serviceController.Run(wait.NeverStop)
return serviceStore
}

Expand All @@ -533,7 +533,7 @@ func watchEndpoints(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
},
)

go eController.Run(util.NeverStop)
go eController.Run(wait.NeverStop)
return eStore
}

Expand All @@ -551,7 +551,7 @@ func watchPods(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
},
)

go eController.Run(util.NeverStop)
go eController.Run(wait.NeverStop)
return eStore
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string

// ensure the service endpoints are sync'd several times within the window that the integration tests wait
go endpointcontroller.NewEndpointController(clientset, controller.NoResyncPeriodFunc).
Run(3, util.NeverStop)
Run(3, wait.NeverStop)

// TODO: Write an integration test for the replication controllers watch.
go replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas).
Run(3, util.NeverStop)
Run(3, wait.NeverStop)

nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
Expand Down
16 changes: 8 additions & 8 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,17 @@ func Run(s *options.CMServer) error {

func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *client.Config, stop <-chan struct{}) error {
go endpointcontroller.NewEndpointController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "endpoint-controller")), ResyncPeriod(s)).
Run(s.ConcurrentEndpointSyncs, util.NeverStop)
Run(s.ConcurrentEndpointSyncs, wait.NeverStop)

go replicationcontroller.NewReplicationManager(
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "replication-controller")),
ResyncPeriod(s),
replicationcontroller.BurstReplicas,
).Run(s.ConcurrentRCSyncs, util.NeverStop)
).Run(s.ConcurrentRCSyncs, wait.NeverStop)

if s.TerminatedPodGCThreshold > 0 {
go gc.New(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "garbage-collector")), ResyncPeriod(s), s.TerminatedPodGCThreshold).
Run(util.NeverStop)
Run(wait.NeverStop)
}

cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
Expand Down Expand Up @@ -219,7 +219,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig

go resourcequotacontroller.NewResourceQuotaController(
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "resourcequota-controller")),
controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop)
controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop)

// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
Expand Down Expand Up @@ -265,25 +265,25 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s)).
Run(s.ConcurrentDSCSyncs, util.NeverStop)
Run(s.ConcurrentDSCSyncs, wait.NeverStop)
}

if containsResource(resources, "jobs") {
glog.Infof("Starting job controller")
go job.NewJobController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "job-controller")), ResyncPeriod(s)).
Run(s.ConcurrentJobSyncs, util.NeverStop)
Run(s.ConcurrentJobSyncs, wait.NeverStop)
}

if containsResource(resources, "deployments") {
glog.Infof("Starting deployment controller")
go deployment.NewDeploymentController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "deployment-controller")), ResyncPeriod(s)).
Run(s.ConcurrentDeploymentSyncs, util.NeverStop)
Run(s.ConcurrentDeploymentSyncs, wait.NeverStop)
}

if containsResource(resources, "replicasets") {
glog.Infof("Starting ReplicaSet controller")
go replicaset.NewReplicaSetController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas).
Run(s.ConcurrentRSSyncs, util.NeverStop)
Run(s.ConcurrentRSSyncs, wait.NeverStop)
}
}

Expand Down
5 changes: 3 additions & 2 deletions cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
utilnet "k8s.io/kubernetes/pkg/util/net"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/wait"

"github.com/golang/glog"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -274,12 +275,12 @@ func (s *ProxyServer) Run() error {
http.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%s", s.ProxyMode)
})
go util.Until(func() {
go wait.Until(func() {
err := http.ListenAndServe(s.Config.HealthzBindAddress+":"+strconv.Itoa(s.Config.HealthzPort), nil)
if err != nil {
glog.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second, util.NeverStop)
}, 5*time.Second, wait.NeverStop)
}

// Tune conntrack, if requested
Expand Down
15 changes: 8 additions & 7 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"
)

Expand Down Expand Up @@ -322,12 +323,12 @@ func Run(s *options.KubeletServer, kcfg *KubeletConfig) error {

if s.HealthzPort > 0 {
healthz.DefaultHealthz()
go util.Until(func() {
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(s.HealthzPort)), nil)
if err != nil {
glog.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second, util.NeverStop)
}, 5*time.Second, wait.NeverStop)
}

if s.RunOnce {
Expand Down Expand Up @@ -611,18 +612,18 @@ func RunKubelet(kcfg *KubeletConfig) error {

func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) {
// start the kubelet
go util.Until(func() { k.Run(podCfg.Updates()) }, 0, util.NeverStop)
go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)

// start the kubelet server
if kc.EnableServer {
go util.Until(func() {
go wait.Until(func() {
k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.Auth, kc.EnableDebuggingHandlers)
}, 0, util.NeverStop)
}, 0, wait.NeverStop)
}
if kc.ReadOnlyPort > 0 {
go util.Until(func() {
go wait.Until(func() {
k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort)
}, 0, util.NeverStop)
}, 0, wait.NeverStop)
}
}

Expand Down
16 changes: 8 additions & 8 deletions contrib/mesos/pkg/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ func (s *CMServer) Run(_ []string) error {
}()

endpoints := s.createEndpointController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "endpoint-controller")))
go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop)
go endpoints.Run(s.ConcurrentEndpointSyncs, wait.NeverStop)

go replicationcontroller.NewReplicationManager(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas).
Run(s.ConcurrentRCSyncs, util.NeverStop)
Run(s.ConcurrentRCSyncs, wait.NeverStop)

if s.TerminatedPodGCThreshold > 0 {
go gc.New(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "garbage-collector")), s.resyncPeriod, s.TerminatedPodGCThreshold).
Run(util.NeverStop)
Run(wait.NeverStop)
}

//TODO(jdef) should eventually support more cloud providers here
Expand All @@ -154,7 +154,7 @@ func (s *CMServer) Run(_ []string) error {
nodeController.Run(s.NodeSyncPeriod)

nodeStatusUpdaterController := node.NewStatusUpdater(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "node-status-controller")), s.NodeMonitorPeriod, time.Now)
if err := nodeStatusUpdaterController.Run(util.NeverStop); err != nil {
if err := nodeStatusUpdaterController.Run(wait.NeverStop); err != nil {
glog.Fatalf("Failed to start node status update controller: %v", err)
}

Expand All @@ -173,7 +173,7 @@ func (s *CMServer) Run(_ []string) error {
}

go resourcequotacontroller.NewResourceQuotaController(
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "resource-quota-controller")), controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop)
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "resource-quota-controller")), controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop)

// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
Expand Down Expand Up @@ -220,19 +220,19 @@ func (s *CMServer) Run(_ []string) error {
if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod).
Run(s.ConcurrentDSCSyncs, util.NeverStop)
Run(s.ConcurrentDSCSyncs, wait.NeverStop)
}

if containsResource(resources, "jobs") {
glog.Infof("Starting job controller")
go job.NewJobController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "job-controller")), s.resyncPeriod).
Run(s.ConcurrentJobSyncs, util.NeverStop)
Run(s.ConcurrentJobSyncs, wait.NeverStop)
}

if containsResource(resources, "deployments") {
glog.Infof("Starting deployment controller")
go deployment.NewDeploymentController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "deployment-controller")), s.resyncPeriod).
Run(s.ConcurrentDeploymentSyncs, util.NeverStop)
Run(s.ConcurrentDeploymentSyncs, wait.NeverStop)
}
}

Expand Down
4 changes: 2 additions & 2 deletions contrib/mesos/pkg/election/etcd_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

"k8s.io/kubernetes/pkg/api/unversioned"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)

Expand Down Expand Up @@ -57,7 +57,7 @@ type etcdMasterElector struct {
func (e *etcdMasterElector) Elect(path, id string) watch.Interface {
e.done = make(chan empty)
e.events = make(chan watch.Event)
go util.Until(func() { e.run(path, id) }, time.Second*5, util.NeverStop)
go wait.Until(func() { e.run(path, id) }, time.Second*5, wait.NeverStop)
return e
}

Expand Down
20 changes: 10 additions & 10 deletions contrib/mesos/pkg/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"

"github.com/mesos/mesos-go/mesosproto"
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {

executor.LaunchTask(mockDriver, taskInfo)

assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool {
executor.lock.Lock()
defer executor.lock.Unlock()
return !registry.empty()
Expand All @@ -251,7 +251,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
finished := kmruntime.After(statusUpdateCalls.Wait)
select {
case <-finished:
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timed out waiting for status update calls to finish")
}

Expand All @@ -267,7 +267,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
})

executor.KillTask(mockDriver, taskInfo.TaskId)
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool {
executor.lock.Lock()
defer executor.lock.Unlock()
return registry.empty()
Expand All @@ -277,7 +277,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
finished = kmruntime.After(statusUpdateCalls.Wait)
select {
case <-finished:
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timed out waiting for status update calls to finish")
}

Expand Down Expand Up @@ -428,7 +428,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
executor.LaunchTask(mockDriver, taskInfo)

// must wait for this otherwise phase changes may not apply
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool {
executor.lock.Lock()
defer executor.lock.Unlock()
return !registry.empty()
Expand All @@ -444,7 +444,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
// from k.tasks through the "task-lost:foo" message below.
select {
case <-called:
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timed out waiting for SendStatusUpdate for the running task")
}

Expand All @@ -462,15 +462,15 @@ func TestExecutorFrameworkMessage(t *testing.T) {

executor.FrameworkMessage(mockDriver, "task-lost:foo")

assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool {
executor.lock.Lock()
defer executor.lock.Unlock()
return registry.empty()
}, "executor must be able to kill a created task and pod")

select {
case <-called:
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timed out waiting for SendStatusUpdate")
}

Expand Down Expand Up @@ -614,7 +614,7 @@ func TestExecutorsendFrameworkMessage(t *testing.T) {
// guard against data race in mock driver between AssertExpectations and Called
select {
case <-called: // expected
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expected call to SendFrameworkMessage")
}
mockDriver.AssertExpectations(t)
Expand Down
4 changes: 2 additions & 2 deletions contrib/mesos/pkg/executor/service/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
log "github.com/golang/glog"
"k8s.io/kubernetes/pkg/kubelet"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)

// executorKubelet decorates the kubelet with a Run function that notifies the
Expand Down Expand Up @@ -76,7 +76,7 @@ func (kl *executorKubelet) Run(mergedUpdates <-chan kubetypes.PodUpdate) {
// will not take very long. Peeking into the future (current k8s master) it
// seems that the backlog has grown from 1 to 50 -- this may negatively impact
// us going forward, time will tell.
util.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone)
wait.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone)

//TODO(jdef) revisit this if/when executor failover lands
// Force kubelet to delete all pods.
Expand Down
Loading

0 comments on commit 4389b3f

Please sign in to comment.