-
Notifications
You must be signed in to change notification settings - Fork 40.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This lib manages runs of a function to have min and max frequencies.
- Loading branch information
Showing
3 changed files
with
573 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
/* | ||
Copyright 2017 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package async | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"k8s.io/client-go/util/flowcontrol" | ||
|
||
"github.com/golang/glog" | ||
) | ||
|
||
// BoundedFrequencyRunner manages runs of a user-provided function. | ||
// See NewBoundedFrequencyRunner for examples. | ||
type BoundedFrequencyRunner struct { | ||
name string // the name of this instance | ||
minInterval time.Duration // the min time between runs, modulo bursts | ||
maxInterval time.Duration // the max time between runs | ||
|
||
run chan struct{} // try an async run | ||
|
||
mu sync.Mutex // guards runs of fn and all mutations | ||
fn func() // function to run | ||
lastRun time.Time // time of last run | ||
timer timer // timer for deferred runs | ||
limiter rateLimiter // rate limiter for on-demand runs | ||
} | ||
|
||
// designed so that flowcontrol.RateLimiter satisfies | ||
type rateLimiter interface { | ||
TryAccept() bool | ||
Stop() | ||
} | ||
|
||
type nullLimiter struct{} | ||
|
||
func (nullLimiter) TryAccept() bool { | ||
return true | ||
} | ||
|
||
func (nullLimiter) Stop() {} | ||
|
||
var _ rateLimiter = nullLimiter{} | ||
|
||
// for testing | ||
type timer interface { | ||
// C returns the timer's selectable channel. | ||
C() <-chan time.Time | ||
|
||
// See time.Timer.Reset. | ||
Reset(d time.Duration) bool | ||
|
||
// See time.Timer.Stop. | ||
Stop() bool | ||
|
||
// See time.Now. | ||
Now() time.Time | ||
|
||
// See time.Since. | ||
Since(t time.Time) time.Duration | ||
|
||
// See time.Sleep. | ||
Sleep(d time.Duration) | ||
} | ||
|
||
// implement our timer in terms of std time.Timer. | ||
type realTimer struct { | ||
*time.Timer | ||
} | ||
|
||
func (rt realTimer) C() <-chan time.Time { | ||
return rt.Timer.C | ||
} | ||
|
||
func (rt realTimer) Now() time.Time { | ||
return time.Now() | ||
} | ||
|
||
func (rt realTimer) Since(t time.Time) time.Duration { | ||
return time.Since(t) | ||
} | ||
|
||
func (rt realTimer) Sleep(d time.Duration) { | ||
time.Sleep(d) | ||
} | ||
|
||
var _ timer = realTimer{} | ||
|
||
// NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance, | ||
// which will manage runs of the specified function. | ||
// | ||
// All runs will be async to the caller of BoundedFrequencyRunner.Run, but | ||
// multiple runs are serialized. If the function needs to hold locks, it must | ||
// take them internally. | ||
// | ||
// Runs of the funtion will have at least minInterval between them (from | ||
// completion to next start), except that up to bursts may be allowed. Burst | ||
// runs are "accumulated" over time, one per minInterval up to burstRuns total. | ||
// This can be used, for example, to mitigate the impact of expensive operations | ||
// being called in response to user-initiated operations. Run requests that | ||
// would violate the minInterval are coallesced and run at the next opportunity. | ||
// | ||
// The function will be run at least once per maxInterval. For example, this can | ||
// force periodic refreshes of state in the absence of anyone calling Run. | ||
// | ||
// Examples: | ||
// | ||
// NewBoundedFrequencyRunner("name", fn, time.Second, 5*time.Second, 1) | ||
// - fn will have at least 1 second between runs | ||
// - fn will have no more than 5 seconds between runs | ||
// | ||
// NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3) | ||
// - fn will have at least 3 seconds between runs, with up to 3 burst runs | ||
// - fn will have no more than 10 seconds between runs | ||
// | ||
// The maxInterval must be greater than or equal to the minInterval, If the | ||
// caller passes a maxInterval less than minInterval, this function will panic. | ||
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner { | ||
timer := realTimer{Timer: time.NewTimer(0)} // will tick immediately | ||
<-timer.C() // consume the first tick | ||
return construct(name, fn, minInterval, maxInterval, burstRuns, timer) | ||
} | ||
|
||
// Make an instance with dependencies injected. | ||
func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner { | ||
if maxInterval < minInterval { | ||
panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, minInterval, maxInterval)) | ||
} | ||
if timer == nil { | ||
panic(fmt.Sprintf("%s: timer must be non-nil", name)) | ||
} | ||
|
||
bfr := &BoundedFrequencyRunner{ | ||
name: name, | ||
fn: fn, | ||
minInterval: minInterval, | ||
maxInterval: maxInterval, | ||
run: make(chan struct{}, 16), | ||
timer: timer, | ||
} | ||
if minInterval == 0 { | ||
bfr.limiter = nullLimiter{} | ||
} else { | ||
// allow burst updates in short succession | ||
qps := float32(time.Second) / float32(minInterval) | ||
bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer) | ||
} | ||
return bfr | ||
} | ||
|
||
// Loop handles the periodic timer and run requests. This is expected to be | ||
// called as a goroutine. | ||
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { | ||
glog.V(3).Infof("%s Loop running", bfr.name) | ||
bfr.timer.Reset(bfr.maxInterval) | ||
for { | ||
select { | ||
case <-stop: | ||
bfr.stop() | ||
glog.V(3).Infof("%s Loop stopping", bfr.name) | ||
return | ||
case <-bfr.timer.C(): | ||
bfr.tryRun() | ||
case <-bfr.run: | ||
bfr.tryRun() | ||
} | ||
} | ||
} | ||
|
||
// Run the function as soon as possible. If this is called while Loop is not | ||
// running, the call may be deferred indefinitely. | ||
func (bfr *BoundedFrequencyRunner) Run() { | ||
bfr.run <- struct{}{} | ||
} | ||
|
||
// assumes the lock is not held | ||
func (bfr *BoundedFrequencyRunner) stop() { | ||
bfr.mu.Lock() | ||
defer bfr.mu.Unlock() | ||
bfr.limiter.Stop() | ||
bfr.timer.Stop() | ||
} | ||
|
||
// assumes the lock is not held | ||
func (bfr *BoundedFrequencyRunner) tryRun() { | ||
bfr.mu.Lock() | ||
defer bfr.mu.Unlock() | ||
|
||
if bfr.limiter.TryAccept() { | ||
// We're allowed to run the function right now. | ||
bfr.fn() | ||
bfr.lastRun = bfr.timer.Now() | ||
bfr.timer.Stop() | ||
bfr.timer.Reset(bfr.maxInterval) | ||
glog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval) | ||
return | ||
} | ||
|
||
// It can't run right now, figure out when it can run next. | ||
|
||
elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run | ||
nextPossible := bfr.minInterval - elapsed // time to next possible run | ||
nextScheduled := bfr.maxInterval - elapsed // time to next periodic run | ||
glog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled) | ||
|
||
if nextPossible < nextScheduled { | ||
// Set the timer for ASAP, but don't drain here. Assuming Loop is running, | ||
// it might get a delivery in the mean time, but that is OK. | ||
bfr.timer.Stop() | ||
bfr.timer.Reset(nextPossible) | ||
glog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible) | ||
} | ||
} |
Oops, something went wrong.