From f327e97661f317d884d39e1db3ccc6cd97ed5080 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Mon, 30 Mar 2015 21:41:10 -0700 Subject: [PATCH] Add a limit to the number of in-flight requests that a server processes. --- cmd/kube-apiserver/app/server.go | 16 +++++++- pkg/apiserver/handlers.go | 37 ++++++++++++++++-- pkg/apiserver/handlers_test.go | 66 ++++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 6 deletions(-) diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 35060c15e174b..df91403162eea 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -24,6 +24,7 @@ import ( "net" "net/http" "os" + "regexp" "strconv" "strings" "time" @@ -76,6 +77,8 @@ type APIServer struct { KubeletConfig client.KubeletConfig ClusterName string EnableProfiling bool + MaxRequestsInFlight int + LongRunningRequestRE string } // NewAPIServer creates a new APIServer object with default parameters @@ -155,6 +158,8 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster") fs.BoolVar(&s.EnableProfiling, "profiling", false, "Enable profiling via web interface host:port/debug/pprof/") fs.StringVar(&s.ExternalHost, "external_hostname", "", "The hostname to use when generating externalized URLs for this master (e.g. Swagger API Docs.)") + fs.IntVar(&s.MaxRequestsInFlight, "max_requests_inflight", 20, "The maximum number of requests in flight at a given time. When the server exceeds this, it rejects requests. Zero for no limit.") + fs.StringVar(&s.LongRunningRequestRE, "long_running_request_regexp", "[.*\\/watch$][^\\/proxy.*]", "A regular expression matching long running requests which should be excluded from maximum inflight request handling.") } // TODO: Longer term we should read this from some config store, rather than a flag. @@ -294,12 +299,19 @@ func (s *APIServer) Run(_ []string) error { // See the flag commentary to understand our assumptions when opening the read-only and read-write ports. + var sem chan bool + if s.MaxRequestsInFlight > 0 { + sem = make(chan bool, s.MaxRequestsInFlight) + } + + longRunningRE := regexp.MustCompile(s.LongRunningRequestRE) + if roLocation != "" { // Default settings allow 1 read-only request per second, allow up to 20 in a burst before enforcing. rl := util.NewTokenBucketRateLimiter(s.APIRate, s.APIBurst) readOnlyServer := &http.Server{ Addr: roLocation, - Handler: apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.InsecureHandler))), + Handler: apiserver.MaxInFlightLimit(sem, longRunningRE, apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.InsecureHandler)))), ReadTimeout: 5 * time.Minute, WriteTimeout: 5 * time.Minute, MaxHeaderBytes: 1 << 20, @@ -319,7 +331,7 @@ func (s *APIServer) Run(_ []string) error { if secureLocation != "" { secureServer := &http.Server{ Addr: secureLocation, - Handler: apiserver.RecoverPanics(m.Handler), + Handler: apiserver.MaxInFlightLimit(sem, longRunningRE, apiserver.RecoverPanics(m.Handler)), ReadTimeout: 5 * time.Minute, WriteTimeout: 5 * time.Minute, MaxHeaderBytes: 1 << 20, diff --git a/pkg/apiserver/handlers.go b/pkg/apiserver/handlers.go index a6a678ea46ea0..87ead96c77f40 100644 --- a/pkg/apiserver/handlers.go +++ b/pkg/apiserver/handlers.go @@ -43,6 +43,10 @@ var specialVerbs = map[string]bool{ "watch": true, } +// Constant for the retry-after interval on rate limiting. +// TODO: maybe make this dynamic? or user-adjustable? +const RetryAfter = "1" + // IsReadOnlyReq() is true for any (or at least many) request which has no observable // side effects on state of apiserver (though there may be internal side effects like // caching and logging). @@ -66,6 +70,27 @@ func ReadOnly(handler http.Handler) http.Handler { }) } +// MaxInFlight limits the number of in-flight requests to buffer size of the passed in channel. +func MaxInFlightLimit(c chan bool, longRunningRequestRE *regexp.Regexp, handler http.Handler) http.Handler { + if c == nil { + return handler + } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if longRunningRequestRE.MatchString(r.URL.Path) { + // Skip tracking long running events. + handler.ServeHTTP(w, r) + return + } + select { + case c <- true: + defer func() { <-c }() + handler.ServeHTTP(w, r) + default: + tooManyRequests(w) + } + }) +} + // RateLimit uses rl to rate limit accepting requests to 'handler'. func RateLimit(rl util.RateLimiter, handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -73,13 +98,17 @@ func RateLimit(rl util.RateLimiter, handler http.Handler) http.Handler { handler.ServeHTTP(w, req) return } - // Return a 429 status indicating "Too Many Requests" - w.Header().Set("Retry-After", "1") - w.WriteHeader(errors.StatusTooManyRequests) - fmt.Fprintf(w, "Rate limit is 10 QPS or a burst of 200") + tooManyRequests(w) }) } +func tooManyRequests(w http.ResponseWriter) { + // Return a 429 status indicating "Too Many Requests" + w.Header().Set("Retry-After", RetryAfter) + w.WriteHeader(errors.StatusTooManyRequests) + fmt.Fprintf(w, "Too many requests, please try again later.") +} + // RecoverPanics wraps an http Handler to recover and log panics. func RecoverPanics(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { diff --git a/pkg/apiserver/handlers_test.go b/pkg/apiserver/handlers_test.go index 8ff475b24c66b..80c63c269883b 100644 --- a/pkg/apiserver/handlers_test.go +++ b/pkg/apiserver/handlers_test.go @@ -20,10 +20,14 @@ import ( "net/http" "net/http/httptest" "reflect" + "regexp" "strings" + "sync" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -33,6 +37,68 @@ type fakeRL bool func (fakeRL) Stop() {} func (f fakeRL) CanAccept() bool { return bool(f) } +func expectHTTP(url string, code int, t *testing.T) { + r, err := http.Get(url) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if r.StatusCode != code { + t.Errorf("unexpected response: %v", r.StatusCode) + } +} + +func TestMaxInFlight(t *testing.T) { + const Iterations = 3 + block := sync.WaitGroup{} + block.Add(1) + sem := make(chan bool, Iterations) + + re := regexp.MustCompile("[.*\\/watch][^\\/proxy.*]") + + server := httptest.NewServer(MaxInFlightLimit(sem, re, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.Path, "dontwait") { + return + } + block.Wait() + }))) + defer server.Close() + + // These should hang, but not affect accounting. + for i := 0; i < Iterations; i++ { + // These should hang waiting on block... + go func() { + expectHTTP(server.URL+"/foo/bar/watch", http.StatusOK, t) + }() + } + for i := 0; i < Iterations; i++ { + // These should hang waiting on block... + go func() { + expectHTTP(server.URL+"/proxy/foo/bar", http.StatusOK, t) + }() + } + expectHTTP(server.URL+"/dontwait", http.StatusOK, t) + + for i := 0; i < Iterations; i++ { + // These should hang waiting on block... + go func() { + expectHTTP(server.URL, http.StatusOK, t) + }() + } + // There's really no more elegant way to do this. I could use a WaitGroup, but even then + // it'd still be racy. + time.Sleep(1 * time.Second) + expectHTTP(server.URL+"/dontwait/watch", http.StatusOK, t) + + // Do this multiple times to show that it rate limit rejected requests don't block. + for i := 0; i < 2; i++ { + expectHTTP(server.URL, errors.StatusTooManyRequests, t) + } + block.Done() + + // Show that we recover from being blocked up. + expectHTTP(server.URL, http.StatusOK, t) +} + func TestRateLimit(t *testing.T) { for _, allow := range []bool{true, false} { rl := fakeRL(allow)