Skip to content

Commit

Permalink
routecontroller: Add wait.NonSlidingUntil, use it
Browse files Browse the repository at this point in the history
Make sure the reconciliation loop kicks in again immediately if it
takes a loooooong time.
  • Loading branch information
zmerlynn committed May 25, 2016
1 parent 025b017 commit 3ec25c5
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/route/routecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterNam
}

func (rc *RouteController) Run(syncPeriod time.Duration) {
go wait.Until(func() {
go wait.NonSlidingUntil(func() {
if err := rc.reconcileNodeRoutes(); err != nil {
glog.Errorf("Couldn't reconcile node routes: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/server/stats/volume_stat_caculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *volumeStatCalculator) StartOnce() *volumeStatCalculator {
s.startO.Do(func() {
go wait.JitterUntil(func() {
s.calcAndStoreStats()
}, s.jitterPeriod, 1.0, s.stopChannel)
}, s.jitterPeriod, 1.0, true, s.stopChannel)
})
return s
}
Expand Down
41 changes: 34 additions & 7 deletions pkg/util/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,19 @@ func Forever(f func(), period time.Duration) {
}

// Until loops until stop channel is closed, running f every period.
// Until is syntactic sugar on top of JitterUntil with zero jitter factor
// Until is syntactic sugar on top of JitterUntil with zero jitter
// factor, with sliding = true (which means the timer for period
// starts after the f completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, stopCh)
JitterUntil(f, period, 0.0, true, stopCh)
}

// NonSlidingUntil loops until stop channel is closed, running f every
// period. NonSlidingUntil is syntactic sugar on top of JitterUntil
// with zero jitter factor, with sliding = false (meaning the timer for
// period starts at the same time as the function starts).
func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, false, stopCh)
}

// JitterUntil loops until stop channel is closed, running f every period.
Expand All @@ -53,28 +63,45 @@ func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
// Catches any panics, and keeps going. f may not be invoked if
// stop channel is already closed. Pass NeverStop to Until if you
// don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, stopCh <-chan struct{}) {
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
select {
case <-stopCh:
return
default:
}

for {
jitteredPeriod := period
if jitterFactor > 0.0 {
jitteredPeriod = Jitter(period, jitterFactor)
}

var t *time.Timer
if !sliding {
t = time.NewTimer(jitteredPeriod)
}

func() {
defer runtime.HandleCrash()
f()
}()

jitteredPeriod := period
if jitterFactor > 0.0 {
jitteredPeriod = Jitter(period, jitterFactor)
if sliding {

This comment has been minimized.

Copy link
@timothysc

timothysc Jul 27, 2016

Member

@zmerlynn I find this to be weirdly conflating the use of this function on an edge use case of the routecontroller.

This comment has been minimized.

Copy link
@zmerlynn

zmerlynn via email Jul 27, 2016

Author Member
t = time.NewTimer(jitteredPeriod)
} else {
// The timer we created could already have fired, so be
// careful and check stopCh first.
select {
case <-stopCh:
return
default:
}
}

select {
case <-stopCh:
return
case <-time.After(jitteredPeriod):
case <-t.C:
}
}
}
Expand Down
28 changes: 24 additions & 4 deletions pkg/util/wait/wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,26 @@ func TestUntil(t *testing.T) {
<-called
}

func TestNonSlidingUntil(t *testing.T) {
ch := make(chan struct{})
close(ch)
NonSlidingUntil(func() {
t.Fatal("should not have been invoked")
}, 0, ch)

ch = make(chan struct{})
called := make(chan struct{})
go func() {
NonSlidingUntil(func() {
called <- struct{}{}
}, 0, ch)
close(called)
}()
<-called
close(ch)
<-called
}

func TestUntilReturnsImmediately(t *testing.T) {
now := time.Now()
ch := make(chan struct{})
Expand All @@ -63,14 +83,14 @@ func TestJitterUntil(t *testing.T) {
close(ch)
JitterUntil(func() {
t.Fatal("should not have been invoked")
}, 0, 1.0, ch)
}, 0, 1.0, true, ch)

ch = make(chan struct{})
called := make(chan struct{})
go func() {
JitterUntil(func() {
called <- struct{}{}
}, 0, 1.0, ch)
}, 0, 1.0, true, ch)
close(called)
}()
<-called
Expand All @@ -83,7 +103,7 @@ func TestJitterUntilReturnsImmediately(t *testing.T) {
ch := make(chan struct{})
JitterUntil(func() {
close(ch)
}, 30*time.Second, 1.0, ch)
}, 30*time.Second, 1.0, true, ch)
if now.Add(25 * time.Second).Before(time.Now()) {
t.Errorf("JitterUntil did not return immediately when the stop chan was closed inside the func")
}
Expand All @@ -98,7 +118,7 @@ func TestJitterUntilNegativeFactor(t *testing.T) {
JitterUntil(func() {
called <- struct{}{}
<-received
}, time.Second, -30.0, ch)
}, time.Second, -30.0, true, ch)
}()
// first loop
<-called
Expand Down

0 comments on commit 3ec25c5

Please sign in to comment.