Skip to content

Commit

Permalink
Merge pull request kubernetes#6207 from brendandburns/server
Browse files Browse the repository at this point in the history
Add a limit to the number of in-flight requests that a server processes.
  • Loading branch information
Quinton Hoole committed Apr 2, 2015
2 parents ee98731 + f327e97 commit 4a2000c
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 6 deletions.
16 changes: 14 additions & 2 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net"
"net/http"
"os"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -77,6 +78,8 @@ type APIServer struct {
KubeletConfig client.KubeletConfig
ClusterName string
EnableProfiling bool
MaxRequestsInFlight int
LongRunningRequestRE string
}

// NewAPIServer creates a new APIServer object with default parameters
Expand Down Expand Up @@ -157,6 +160,8 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster")
fs.BoolVar(&s.EnableProfiling, "profiling", false, "Enable profiling via web interface host:port/debug/pprof/")
fs.StringVar(&s.ExternalHost, "external_hostname", "", "The hostname to use when generating externalized URLs for this master (e.g. Swagger API Docs.)")
fs.IntVar(&s.MaxRequestsInFlight, "max_requests_inflight", 20, "The maximum number of requests in flight at a given time. When the server exceeds this, it rejects requests. Zero for no limit.")
fs.StringVar(&s.LongRunningRequestRE, "long_running_request_regexp", "[.*\\/watch$][^\\/proxy.*]", "A regular expression matching long running requests which should be excluded from maximum inflight request handling.")
}

// TODO: Longer term we should read this from some config store, rather than a flag.
Expand Down Expand Up @@ -300,12 +305,19 @@ func (s *APIServer) Run(_ []string) error {

// See the flag commentary to understand our assumptions when opening the read-only and read-write ports.

var sem chan bool
if s.MaxRequestsInFlight > 0 {
sem = make(chan bool, s.MaxRequestsInFlight)
}

longRunningRE := regexp.MustCompile(s.LongRunningRequestRE)

if roLocation != "" {
// Default settings allow 1 read-only request per second, allow up to 20 in a burst before enforcing.
rl := util.NewTokenBucketRateLimiter(s.APIRate, s.APIBurst)
readOnlyServer := &http.Server{
Addr: roLocation,
Handler: apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.InsecureHandler))),
Handler: apiserver.MaxInFlightLimit(sem, longRunningRE, apiserver.RecoverPanics(apiserver.ReadOnly(apiserver.RateLimit(rl, m.InsecureHandler)))),
ReadTimeout: 5 * time.Minute,
WriteTimeout: 5 * time.Minute,
MaxHeaderBytes: 1 << 20,
Expand All @@ -325,7 +337,7 @@ func (s *APIServer) Run(_ []string) error {
if secureLocation != "" {
secureServer := &http.Server{
Addr: secureLocation,
Handler: apiserver.RecoverPanics(m.Handler),
Handler: apiserver.MaxInFlightLimit(sem, longRunningRE, apiserver.RecoverPanics(m.Handler)),
ReadTimeout: 5 * time.Minute,
WriteTimeout: 5 * time.Minute,
MaxHeaderBytes: 1 << 20,
Expand Down
37 changes: 33 additions & 4 deletions pkg/apiserver/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ var specialVerbs = map[string]bool{
"watch": true,
}

// Constant for the retry-after interval on rate limiting.
// TODO: maybe make this dynamic? or user-adjustable?
const RetryAfter = "1"

// IsReadOnlyReq() is true for any (or at least many) request which has no observable
// side effects on state of apiserver (though there may be internal side effects like
// caching and logging).
Expand All @@ -66,20 +70,45 @@ func ReadOnly(handler http.Handler) http.Handler {
})
}

// MaxInFlight limits the number of in-flight requests to buffer size of the passed in channel.
func MaxInFlightLimit(c chan bool, longRunningRequestRE *regexp.Regexp, handler http.Handler) http.Handler {
if c == nil {
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if longRunningRequestRE.MatchString(r.URL.Path) {
// Skip tracking long running events.
handler.ServeHTTP(w, r)
return
}
select {
case c <- true:
defer func() { <-c }()
handler.ServeHTTP(w, r)
default:
tooManyRequests(w)
}
})
}

// RateLimit uses rl to rate limit accepting requests to 'handler'.
func RateLimit(rl util.RateLimiter, handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if rl.CanAccept() {
handler.ServeHTTP(w, req)
return
}
// Return a 429 status indicating "Too Many Requests"
w.Header().Set("Retry-After", "1")
w.WriteHeader(errors.StatusTooManyRequests)
fmt.Fprintf(w, "Rate limit is 10 QPS or a burst of 200")
tooManyRequests(w)
})
}

func tooManyRequests(w http.ResponseWriter) {
// Return a 429 status indicating "Too Many Requests"
w.Header().Set("Retry-After", RetryAfter)
w.WriteHeader(errors.StatusTooManyRequests)
fmt.Fprintf(w, "Too many requests, please try again later.")
}

// RecoverPanics wraps an http Handler to recover and log panics.
func RecoverPanics(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
Expand Down
66 changes: 66 additions & 0 deletions pkg/apiserver/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ import (
"net/http"
"net/http/httptest"
"reflect"
"regexp"
"strings"
"sync"
"testing"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
Expand All @@ -33,6 +37,68 @@ type fakeRL bool
func (fakeRL) Stop() {}
func (f fakeRL) CanAccept() bool { return bool(f) }

func expectHTTP(url string, code int, t *testing.T) {
r, err := http.Get(url)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if r.StatusCode != code {
t.Errorf("unexpected response: %v", r.StatusCode)
}
}

func TestMaxInFlight(t *testing.T) {
const Iterations = 3
block := sync.WaitGroup{}
block.Add(1)
sem := make(chan bool, Iterations)

re := regexp.MustCompile("[.*\\/watch][^\\/proxy.*]")

server := httptest.NewServer(MaxInFlightLimit(sem, re, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "dontwait") {
return
}
block.Wait()
})))
defer server.Close()

// These should hang, but not affect accounting.
for i := 0; i < Iterations; i++ {
// These should hang waiting on block...
go func() {
expectHTTP(server.URL+"/foo/bar/watch", http.StatusOK, t)
}()
}
for i := 0; i < Iterations; i++ {
// These should hang waiting on block...
go func() {
expectHTTP(server.URL+"/proxy/foo/bar", http.StatusOK, t)
}()
}
expectHTTP(server.URL+"/dontwait", http.StatusOK, t)

for i := 0; i < Iterations; i++ {
// These should hang waiting on block...
go func() {
expectHTTP(server.URL, http.StatusOK, t)
}()
}
// There's really no more elegant way to do this. I could use a WaitGroup, but even then
// it'd still be racy.
time.Sleep(1 * time.Second)
expectHTTP(server.URL+"/dontwait/watch", http.StatusOK, t)

// Do this multiple times to show that it rate limit rejected requests don't block.
for i := 0; i < 2; i++ {
expectHTTP(server.URL, errors.StatusTooManyRequests, t)
}
block.Done()

// Show that we recover from being blocked up.
expectHTTP(server.URL, http.StatusOK, t)
}

func TestRateLimit(t *testing.T) {
for _, allow := range []bool{true, false} {
rl := fakeRL(allow)
Expand Down

0 comments on commit 4a2000c

Please sign in to comment.