-
Notifications
You must be signed in to change notification settings - Fork 40.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit events accepted by API Server #50925
Limit events accepted by API Server #50925
Conversation
Hi @staebler. Thanks for your PR. I'm waiting for a kubernetes member to verify that this patch is reasonable to test. If it is, they should reply with I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
c2d0db3
to
2e94750
Compare
|
||
// newEventRateLimitWithClock configures an admission controller that can enforce event rate limits. | ||
// It uses a clock for testing purposes. | ||
func newEventRateLimitWithClock(config *eventratelimitapi.Configuration, clock flowcontrol.Clock) (admission.Interface, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the method to always use a clock and just call it with a real clock from the register. This mass of new
s is hard to follow.
} | ||
|
||
// newEventRateLimit configures an admission controller than can enforce event rate limits | ||
func newEventRateLimitUsingRLF(config *eventratelimitapi.Configuration, rlf rateLimiterFactory) (admission.Interface, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you eliminate the clock variant above and always require it, you can eliminate this too.
if err != nil { | ||
return nil, err | ||
} | ||
eventRateLimitAdmission.limitEnforcers = append(eventRateLimitAdmission.limitEnforcers, enforcer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: construct this array above and initialize the eventRateLImitAdmission
in one init block instead of mutating it here.
rateLimiter flowcontrol.RateLimiter | ||
} | ||
|
||
func newSingleCache(rateLimiter flowcontrol.RateLimiter) *singleCache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The style in the project doesn't use this for simple construction of private (or even public) structs. Imagine all the anti-patterns from every other language and that's the go-style.
return nil | ||
} | ||
|
||
// cache is an interface for caching the limits of a particular type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this and both implementations to another file in this package.
type eventRateLimitAdmission struct { | ||
sync.RWMutex | ||
*admission.Handler | ||
limitEnforcers []*limitEnforcer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good indicating how many of these you expect. Less than 10 by the looks of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What type of indication are you looking for here? Are you suggesting a comment? Or are you suggesting something like [4]*limitEnforcer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What type of indication are you looking for here? Are you suggesting a comment? Or are you suggesting something like [4]*limitEnforcer?
comment
|
||
// eventRateLimitAdmission implements an admission controller that can enforce event rate limits | ||
type eventRateLimitAdmission struct { | ||
sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't anonymously include the mutex
type eventRateLimitAdmission struct { | ||
sync.RWMutex | ||
*admission.Handler | ||
limitEnforcers []*limitEnforcer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read-only after construction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind elaborating on this? I don't know how to make something read-only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind elaborating on this? I don't know how to make something read-only.
It was a question. Is it read-only after construction. If so, a comment would help.
func newLimitEnforcer(config eventratelimitapi.Limit, rlf rateLimiterFactory) (*limitEnforcer, error) { | ||
limitType := config.Type | ||
newRateLimiter := func() flowcontrol.RateLimiter { | ||
return rlf(config.QPS, config.Burst) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look valuable. Why not just pass the func() flowcontrol.RateLimiter
you need?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not quite sure what you are suggesting here. I may have addressed, or avoided, this with other changes. But you may want to check that this specifically is addressed in the next revision of changes.
filter := rateLimiter.TryAccept() | ||
|
||
// update the cache | ||
enforcer.cache.add(key, rateLimiter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks odd. We have to do a write every accept
call? Did this belong in that !found
block above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seemed odd to me, too. It was copied over from the event rate limiting code in the client-side event recorder. I had assumed that Add was needed to make the key the most recently used. I see from the lru code, though, that Get does that already, which makes sense.
// get the rate limiter associated with the specified key | ||
get(key interface{}) (rateLimiter flowcontrol.RateLimiter, found bool) | ||
// add the specified rate limiter to the cache, associated with the specified key | ||
add(key interface{}, rateLimiter flowcontrol.RateLimiter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have this lever? This is an attempt to avoid plumbing the func() RateLimiter
through? Seems like the get
call should always return a valid RateLimiter and we have no need for a separate get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err no need for a separate add
return nil | ||
} | ||
|
||
a.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is questionable. You're doing this because the ratelimiter isn't threadsafe or for something else? I thought the ratelimiters were thread-safe (I remember looking back when we were working on the client).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The locking is required because the LRU cache is not thread-safe. I will move the lock into lruCache to minimize the duration the lock is held.
return nil | ||
} | ||
|
||
// getNamespaceKey returns a key for a parceledGate that is based on the namespace of the event request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't say parcel. This is a key for the request, rigth?
return userInfo.GetName() | ||
} | ||
|
||
// getSourceObjecttKey returns a key for a parceledGate that is based on the source+object of the event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minus parcel. This a key for the request/limit tuple
// give each limit enforcer a chance to reject the event | ||
for _, enforcer := range a.limitEnforcers { | ||
if err := enforcer.accept(attr); err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can't short-circuit here. You need to count against all the rate limits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Derek and I discussed this a while back, coming to the conclusion that it was adequate to short-circuit. The behavior is similar to having each limit type be a separate admission control. If an earlier admission control in the chain rejects a request, the later admission controls never see that request and do not have the opportunity to count the request against any limits. I don't think that it really matters too much in the grand scheme of things whether we count a rejected request against all limits or only a subset of limits. If you feel strongly that each rejected request must count against all limits, then I will make that change. Logically, it probably makes the most sense to have a rejected request count against none of the limits, as it was rejected. This is harder to implement, though. And, again, I don't think it matters too much in the overall flow of event requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you feel strongly that each rejected request must count against all limits, then I will make that change.
I feel pretty strongly. Either all or none please
// "namespace": limits are maintained against events from each namespace | ||
// "user": limits are maintained against events from each user | ||
// "source+object": limits are maintained against events from each source+object | ||
Type string `json:"type"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alias the string type string LimitType
QPS float32 `json:"qps"` | ||
|
||
// Maximum burst for throttle of events for this limit | ||
Burst int ` json:"burst"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int64
. Externals must specify. To match, internals can specify too.
// | ||
// If the type of limit is "server", then CacheSize is ignored and can be | ||
// omitted. | ||
CacheSize int `json:"cacheSize"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should have a default if it isn't specified. I'm ok with the default varying based on type. I'd also make this omitEmpty
. I don't think most people will care, but I could see it mattering for namespaces.
left some initial comments. Let's work through those and then re-review. I'm going to start again from the top, so squashing the changes into the main commit is fine. |
e0f1cd8
to
9952de7
Compare
/retest |
update bazel |
gofmt |
also, squash |
} | ||
|
||
func (c *lruCache) get(key interface{}) flowcontrol.RateLimiter { | ||
c.mutex.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given a thread-safe lru cache and the fact that we don't care if we're off by a little bit, we don't actually care if this method is locked anymore do we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Oversight on my part to not remove the mutex when I switched LRU cache implementations.
c6e350b
to
fb02ad2
Compare
I do not know what to do with this test failure. The failure is the k8s.io/kubernetes/test/integration/garbagecollector TestCascadingDeletion test. I do not see how that test is affected by these changes. |
/retest |
UserLimitType LimitType = "user" | ||
// SourceAndObjectLimitType is a type of limit where there is one bucket used | ||
// by each combination of source and involved object of the event. | ||
SourceAndObjectLimitType LimitType = "sourceAndObject" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you missed clayton's "Start with a capital letter".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I registered the latter part of that comment, but not the former.
The EventRateLimit plug-in limits the number of events that the API Server will accept in a given time period. It allows for server-wide, per-namespace, per-user,and per-source+object rate limiting.
fb02ad2
to
a4542ae
Compare
/lgtm |
const ( | ||
// StatusTooManyRequests means the server experienced too many requests within a | ||
// given window and that the client must wait to perform the action again. | ||
StatusTooManyRequests = 429 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not http.StatusTooManyRequests?
// NewTooManyRequestsError returns an error indicating that the request was rejected because | ||
// the server has received too many requests. Client should wait and retry. But if the request | ||
// is perishable, then the client should not retry the request. | ||
func NewTooManyRequestsError(message string) *StatusError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NewTooManyRequests already exists, right?
fix the name of the admission plugin in the release note to match what they would use with |
Ah, good old go, don't specify "error" because its in an error package. Ok as a follow to remove the duplicate. Tagging since its minor and today is freeze. /lgtm |
Net new admission plugin with approved proposal. manually approving the new package. |
/retest |
/approve Already approved by other approvers (new bazel dir) |
/approve no-issue |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: deads2k, smarterclayton, staebler Associated issue requirement bypassed by: smarterclayton The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these OWNERS Files:
You can indicate your approval by writing |
Automatic merge from submit-queue (batch tested with PRs 51805, 51725, 50925, 51474, 51638) |
What this PR does / why we need it:
This PR adds the ability to limit events processed by an API server. Limits can be set globally on a server, per-namespace, per-user, and per-source+object. This is needed to prevent badly-configured or misbehaving players from making a cluster unstable.
Please see kubernetes/community#945.
Release Note: