-
Notifications
You must be signed in to change notification settings - Fork 314
/
Copy pathratelimit.go
345 lines (306 loc) · 11 KB
/
ratelimit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
// Copyright 2014 Canonical Ltd.
// Licensed under the LGPLv3 with static-linking exception.
// See LICENCE file for details.
// Package ratelimit provides an efficient token bucket implementation
// that can be used to limit the rate of arbitrary things.
// See http://en.wikipedia.org/wiki/Token_bucket.
package ratelimit
import (
"math"
"strconv"
"sync"
"time"
)
// The algorithm that this implementation uses does computational work
// only when tokens are removed from the bucket, and that work completes
// in short, bounded-constant time (Bucket.Wait benchmarks at 175ns on
// my laptop).
//
// Time is measured in equal measured ticks, a given interval
// (fillInterval) apart. On each tick a number of tokens (quantum) are
// added to the bucket.
//
// When any of the methods are called the bucket updates the number of
// tokens that are in the bucket, and it records the current tick
// number too. Note that it doesn't record the current time - by
// keeping things in units of whole ticks, it's easy to dish out tokens
// at exactly the right intervals as measured from the start time.
//
// This allows us to calculate the number of tokens that will be
// available at some time in the future with a few simple arithmetic
// operations.
//
// The main reason for being able to transfer multiple tokens on each tick
// is so that we can represent rates greater than 1e9 (the resolution of the Go
// time package) tokens per second, but it's also useful because
// it means we can easily represent situations like "a person gets
// five tokens an hour, replenished on the hour".
// Bucket represents a token bucket that fills at a predetermined rate.
// Methods on Bucket may be called concurrently.
type Bucket struct {
clock Clock
// startTime holds the moment when the bucket was
// first created and ticks began.
startTime time.Time
// capacity holds the overall capacity of the bucket.
capacity int64
// quantum holds how many tokens are added on
// each tick.
quantum int64
// fillInterval holds the interval between each tick.
fillInterval time.Duration
// mu guards the fields below it.
mu sync.Mutex
// availableTokens holds the number of available
// tokens as of the associated latestTick.
// It will be negative when there are consumers
// waiting for tokens.
availableTokens int64
// latestTick holds the latest tick for which
// we know the number of tokens in the bucket.
latestTick int64
}
// NewBucket returns a new token bucket that fills at the
// rate of one token every fillInterval, up to the given
// maximum capacity. Both arguments must be
// positive. The bucket is initially full.
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
return NewBucketWithClock(fillInterval, capacity, nil)
}
// NewBucketWithClock is identical to NewBucket but injects a testable clock
// interface.
func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
}
// rateMargin specifes the allowed variance of actual
// rate from specified rate. 1% seems reasonable.
const rateMargin = 0.01
// NewBucketWithRate returns a token bucket that fills the bucket
// at the rate of rate tokens per second up to the given
// maximum capacity. Because of limited clock resolution,
// at high rates, the actual rate may be up to 1% different from the
// specified rate.
func NewBucketWithRate(rate float64, capacity int64) *Bucket {
return NewBucketWithRateAndClock(rate, capacity, nil)
}
// NewBucketWithRateAndClock is identical to NewBucketWithRate but injects a
// testable clock interface.
func NewBucketWithRateAndClock(rate float64, capacity int64, clock Clock) *Bucket {
// Use the same bucket each time through the loop
// to save allocations.
tb := NewBucketWithQuantumAndClock(1, capacity, 1, clock)
for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) {
fillInterval := time.Duration(1e9 * float64(quantum) / rate)
if fillInterval <= 0 {
continue
}
tb.fillInterval = fillInterval
tb.quantum = quantum
if diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin {
return tb
}
}
panic("cannot find suitable quantum for " + strconv.FormatFloat(rate, 'g', -1, 64))
}
// nextQuantum returns the next quantum to try after q.
// We grow the quantum exponentially, but slowly, so we
// get a good fit in the lower numbers.
func nextQuantum(q int64) int64 {
q1 := q * 11 / 10
if q1 == q {
q1++
}
return q1
}
// NewBucketWithQuantum is similar to NewBucket, but allows
// the specification of the quantum size - quantum tokens
// are added every fillInterval.
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket {
return NewBucketWithQuantumAndClock(fillInterval, capacity, quantum, nil)
}
// NewBucketWithQuantumAndClock is like NewBucketWithQuantum, but
// also has a clock argument that allows clients to fake the passing
// of time. If clock is nil, the system clock will be used.
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
if clock == nil {
clock = realClock{}
}
if fillInterval <= 0 {
panic("token bucket fill interval is not > 0")
}
if capacity <= 0 {
panic("token bucket capacity is not > 0")
}
if quantum <= 0 {
panic("token bucket quantum is not > 0")
}
return &Bucket{
clock: clock,
startTime: clock.Now(),
latestTick: 0,
fillInterval: fillInterval,
capacity: capacity,
quantum: quantum,
availableTokens: capacity,
}
}
// Wait takes count tokens from the bucket, waiting until they are
// available.
func (tb *Bucket) Wait(count int64) {
if d := tb.Take(count); d > 0 {
tb.clock.Sleep(d)
}
}
// WaitMaxDuration is like Wait except that it will
// only take tokens from the bucket if it needs to wait
// for no greater than maxWait. It reports whether
// any tokens have been removed from the bucket
// If no tokens have been removed, it returns immediately.
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool {
d, ok := tb.TakeMaxDuration(count, maxWait)
if d > 0 {
tb.clock.Sleep(d)
}
return ok
}
const infinityDuration time.Duration = 0x7fffffffffffffff
// Take takes count tokens from the bucket without blocking. It returns
// the time that the caller should wait until the tokens are actually
// available.
//
// Note that if the request is irrevocable - there is no way to return
// tokens to the bucket once this method commits us to taking them.
func (tb *Bucket) Take(count int64) time.Duration {
tb.mu.Lock()
defer tb.mu.Unlock()
d, _ := tb.take(tb.clock.Now(), count, infinityDuration)
return d
}
// TakeMaxDuration is like Take, except that
// it will only take tokens from the bucket if the wait
// time for the tokens is no greater than maxWait.
//
// If it would take longer than maxWait for the tokens
// to become available, it does nothing and reports false,
// otherwise it returns the time that the caller should
// wait until the tokens are actually available, and reports
// true.
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {
tb.mu.Lock()
defer tb.mu.Unlock()
return tb.take(tb.clock.Now(), count, maxWait)
}
// TakeAvailable takes up to count immediately available tokens from the
// bucket. It returns the number of tokens removed, or zero if there are
// no available tokens. It does not block.
func (tb *Bucket) TakeAvailable(count int64) int64 {
tb.mu.Lock()
defer tb.mu.Unlock()
return tb.takeAvailable(tb.clock.Now(), count)
}
// takeAvailable is the internal version of TakeAvailable - it takes the
// current time as an argument to enable easy testing.
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
if count <= 0 {
return 0
}
tb.adjustavailableTokens(tb.currentTick(now))
if tb.availableTokens <= 0 {
return 0
}
if count > tb.availableTokens {
count = tb.availableTokens
}
tb.availableTokens -= count
return count
}
// Available returns the number of available tokens. It will be negative
// when there are consumers waiting for tokens. Note that if this
// returns greater than zero, it does not guarantee that calls that take
// tokens from the buffer will succeed, as the number of available
// tokens could have changed in the meantime. This method is intended
// primarily for metrics reporting and debugging.
func (tb *Bucket) Available() int64 {
return tb.available(tb.clock.Now())
}
// available is the internal version of available - it takes the current time as
// an argument to enable easy testing.
func (tb *Bucket) available(now time.Time) int64 {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.adjustavailableTokens(tb.currentTick(now))
return tb.availableTokens
}
// Capacity returns the capacity that the bucket was created with.
func (tb *Bucket) Capacity() int64 {
return tb.capacity
}
// Rate returns the fill rate of the bucket, in tokens per second.
func (tb *Bucket) Rate() float64 {
return 1e9 * float64(tb.quantum) / float64(tb.fillInterval)
}
// take is the internal version of Take - it takes the current time as
// an argument to enable easy testing.
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {
if count <= 0 {
return 0, true
}
tick := tb.currentTick(now)
tb.adjustavailableTokens(tick)
avail := tb.availableTokens - count
if avail >= 0 {
tb.availableTokens = avail
return 0, true
}
// Round up the missing tokens to the nearest multiple
// of quantum - the tokens won't be available until
// that tick.
// endTick holds the tick when all the requested tokens will
// become available.
endTick := tick + (-avail+tb.quantum-1)/tb.quantum
endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
waitTime := endTime.Sub(now)
if waitTime > maxWait {
return 0, false
}
tb.availableTokens = avail
return waitTime, true
}
// currentTick returns the current time tick, measured
// from tb.startTime.
func (tb *Bucket) currentTick(now time.Time) int64 {
return int64(now.Sub(tb.startTime) / tb.fillInterval)
}
// adjustavailableTokens adjusts the current number of tokens
// available in the bucket at the given time, which must
// be in the future (positive) with respect to tb.latestTick.
func (tb *Bucket) adjustavailableTokens(tick int64) {
lastTick := tb.latestTick
tb.latestTick = tick
if tb.availableTokens >= tb.capacity {
return
}
tb.availableTokens += (tick - lastTick) * tb.quantum
if tb.availableTokens > tb.capacity {
tb.availableTokens = tb.capacity
}
return
}
// Clock represents the passage of time in a way that
// can be faked out for tests.
type Clock interface {
// Now returns the current time.
Now() time.Time
// Sleep sleeps for at least the given duration.
Sleep(d time.Duration)
}
// realClock implements Clock in terms of standard time functions.
type realClock struct{}
// Now implements Clock.Now by calling time.Now.
func (realClock) Now() time.Time {
return time.Now()
}
// Now implements Clock.Sleep by calling time.Sleep.
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
}