Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

routecontroller: Add wait.NonSlidingUntil, use it #26301

Merged
merged 1 commit into from
May 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controller/route/routecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterNam
}

func (rc *RouteController) Run(syncPeriod time.Duration) {
go wait.Until(func() {
go wait.NonSlidingUntil(func() {
if err := rc.reconcileNodeRoutes(); err != nil {
glog.Errorf("Couldn't reconcile node routes: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/server/stats/volume_stat_caculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *volumeStatCalculator) StartOnce() *volumeStatCalculator {
s.startO.Do(func() {
go wait.JitterUntil(func() {
s.calcAndStoreStats()
}, s.jitterPeriod, 1.0, s.stopChannel)
}, s.jitterPeriod, 1.0, true, s.stopChannel)
})
return s
}
Expand Down
41 changes: 34 additions & 7 deletions pkg/util/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,19 @@ func Forever(f func(), period time.Duration) {
}

// Until loops until stop channel is closed, running f every period.
// Until is syntactic sugar on top of JitterUntil with zero jitter factor
// Until is syntactic sugar on top of JitterUntil with zero jitter
// factor, with sliding = true (which means the timer for period
// starts after the f completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, stopCh)
JitterUntil(f, period, 0.0, true, stopCh)
}

// NonSlidingUntil loops until stop channel is closed, running f every
// period. NonSlidingUntil is syntactic sugar on top of JitterUntil
// with zero jitter factor, with sliding = false (meaning the timer for
// period starts at the same time as the function starts).
func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, false, stopCh)
}

// JitterUntil loops until stop channel is closed, running f every period.
Expand All @@ -53,28 +63,45 @@ func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
// Catches any panics, and keeps going. f may not be invoked if
// stop channel is already closed. Pass NeverStop to Until if you
// don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, stopCh <-chan struct{}) {
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
select {
case <-stopCh:
return
default:
}

for {
jitteredPeriod := period
if jitterFactor > 0.0 {
jitteredPeriod = Jitter(period, jitterFactor)
}

var t *time.Timer
if !sliding {
t = time.NewTimer(jitteredPeriod)
}

func() {
defer runtime.HandleCrash()
f()
}()

jitteredPeriod := period
if jitterFactor > 0.0 {
jitteredPeriod = Jitter(period, jitterFactor)
if sliding {
t = time.NewTimer(jitteredPeriod)
} else {
// The timer we created could already have fired, so be
// careful and check stopCh first.
select {
case <-stopCh:
return
default:
}
}

select {
case <-stopCh:
return
case <-time.After(jitteredPeriod):
case <-t.C:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race that can occur here when sliding is true, and jitterPeriod is 0, and a stopCh + t.C are triggered in close succession. #26782 (comment)

/cc @wojtek-t @mikedanese

}
}
}
Expand Down
28 changes: 24 additions & 4 deletions pkg/util/wait/wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,26 @@ func TestUntil(t *testing.T) {
<-called
}

func TestNonSlidingUntil(t *testing.T) {
ch := make(chan struct{})
close(ch)
NonSlidingUntil(func() {
t.Fatal("should not have been invoked")
}, 0, ch)

ch = make(chan struct{})
called := make(chan struct{})
go func() {
NonSlidingUntil(func() {
called <- struct{}{}
}, 0, ch)
close(called)
}()
<-called
close(ch)
<-called
}

func TestUntilReturnsImmediately(t *testing.T) {
now := time.Now()
ch := make(chan struct{})
Expand All @@ -63,14 +83,14 @@ func TestJitterUntil(t *testing.T) {
close(ch)
JitterUntil(func() {
t.Fatal("should not have been invoked")
}, 0, 1.0, ch)
}, 0, 1.0, true, ch)

ch = make(chan struct{})
called := make(chan struct{})
go func() {
JitterUntil(func() {
called <- struct{}{}
}, 0, 1.0, ch)
}, 0, 1.0, true, ch)
close(called)
}()
<-called
Expand All @@ -83,7 +103,7 @@ func TestJitterUntilReturnsImmediately(t *testing.T) {
ch := make(chan struct{})
JitterUntil(func() {
close(ch)
}, 30*time.Second, 1.0, ch)
}, 30*time.Second, 1.0, true, ch)
if now.Add(25 * time.Second).Before(time.Now()) {
t.Errorf("JitterUntil did not return immediately when the stop chan was closed inside the func")
}
Expand All @@ -98,7 +118,7 @@ func TestJitterUntilNegativeFactor(t *testing.T) {
JitterUntil(func() {
called <- struct{}{}
<-received
}, time.Second, -30.0, ch)
}, time.Second, -30.0, true, ch)
}()
// first loop
<-called
Expand Down