diff --git a/contrib/for-tests/network-tester/webserver.go b/contrib/for-tests/network-tester/webserver.go index 6f1ce54376e1b..693abec078f2d 100644 --- a/contrib/for-tests/network-tester/webserver.go +++ b/contrib/for-tests/network-tester/webserver.go @@ -208,7 +208,7 @@ func contactOthers(state *State) { Host: os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"), Path: "/api/v1beta1", } - client := &client.Client{client.NewRESTClient(&masterRO, "v1beta1", latest.Codec, true)} + client := &client.Client{client.NewRESTClient(&masterRO, "v1beta1", latest.Codec, true, 0)} // Do this repeatedly, in case there's some propagation delay with getting // newly started pods into the endpoints list. diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index 2ba443456ebff..c5828170da22c 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -508,7 +508,7 @@ __EOF__ # Pre-condition: frontend replication controller is running kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" 'frontend-controller:' # Command - kubectl delete rc frontend-controller "${kube_flags[@]}" + kubectl stop rc frontend-controller "${kube_flags[@]}" # Post-condition: no replication controller is running kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" '' @@ -525,7 +525,7 @@ __EOF__ # Pre-condition: frontend and redis-slave kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" 'frontend-controller:redis-slave-controller:' # Command - kubectl delete rc frontend-controller redis-slave-controller "${kube_flags[@]}" # delete multiple controllers at once + kubectl stop rc frontend-controller redis-slave-controller "${kube_flags[@]}" # delete multiple controllers at once # Post-condition: no replication controller is running kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" '' diff --git a/pkg/client/helper.go b/pkg/client/helper.go index 771031375d5ff..601efed8b15ce 100644 --- a/pkg/client/helper.go +++ b/pkg/client/helper.go @@ -77,6 +77,9 @@ type Config struct { // Transport may be used for custom HTTP behavior. This attribute may not // be specified with the TLS client certificate options. Transport http.RoundTripper + + // QPS indicates the maximum QPS to the master from this client. If zero, QPS is unlimited. + QPS float32 } type KubeletConfig struct { @@ -175,6 +178,9 @@ func SetKubernetesDefaults(config *Config) error { config.Codec = versionInterfaces.Codec } config.LegacyBehavior = (version == "v1beta1" || version == "v1beta2") + if config.QPS == 0.0 { + config.QPS = 5.0 + } return nil } @@ -195,7 +201,7 @@ func RESTClientFor(config *Config) (*RESTClient, error) { return nil, err } - client := NewRESTClient(baseURL, config.Version, config.Codec, config.LegacyBehavior) + client := NewRESTClient(baseURL, config.Version, config.Codec, config.LegacyBehavior, config.QPS) transport, err := TransportFor(config) if err != nil { diff --git a/pkg/client/helper_test.go b/pkg/client/helper_test.go index 932f423a1a6e7..5045dac4e224a 100644 --- a/pkg/client/helper_test.go +++ b/pkg/client/helper_test.go @@ -273,6 +273,7 @@ func TestSetKubernetesDefaults(t *testing.T) { Version: latest.Version, Codec: latest.Codec, LegacyBehavior: (latest.Version == "v1beta1" || latest.Version == "v1beta2"), + QPS: 5, }, false, }, diff --git a/pkg/client/restclient.go b/pkg/client/restclient.go index 55e01aca92821..c603e5df88ff9 100644 --- a/pkg/client/restclient.go +++ b/pkg/client/restclient.go @@ -22,6 +22,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) // RESTClient imposes common Kubernetes API conventions on a set of resource paths. @@ -50,13 +51,16 @@ type RESTClient struct { Client HTTPClient Timeout time.Duration + + // TODO extract this into a wrapper interface via the RESTClient interface in kubectl. + Throttle util.RateLimiter } // NewRESTClient creates a new RESTClient. This client performs generic REST functions // such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and // decoding of responses from the server. If this client should use the older, legacy // API conventions from Kubernetes API v1beta1 and v1beta2, set legacyBehavior true. -func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyBehavior bool) *RESTClient { +func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyBehavior bool, maxQPS float32) *RESTClient { base := *baseURL if !strings.HasSuffix(base.Path, "/") { base.Path += "/" @@ -64,6 +68,10 @@ func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyB base.RawQuery = "" base.Fragment = "" + var throttle util.RateLimiter + if maxQPS > 0 { + throttle = util.NewTokenBucketRateLimiter(maxQPS, 10) + } return &RESTClient{ baseURL: &base, apiVersion: apiVersion, @@ -71,6 +79,8 @@ func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyB Codec: c, LegacyBehavior: legacyBehavior, + + Throttle: throttle, } } @@ -87,11 +97,9 @@ func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyB // list, ok := resp.(*api.PodList) // func (c *RESTClient) Verb(verb string) *Request { - // TODO: uncomment when Go 1.2 support is dropped - //var timeout time.Duration = 0 - // if c.Client != nil { - // timeout = c.Client.Timeout - // } + if c.Throttle != nil { + c.Throttle.Accept() + } return NewRequest(c.Client, verb, c.baseURL, c.apiVersion, c.Codec, c.LegacyBehavior, c.LegacyBehavior).Timeout(c.Timeout) } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index dc925da6e58e9..d1857c334c571 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -48,7 +48,7 @@ func getFakeClient(t *testing.T, validURLs []string) (ClientPosterFunc, *httptes return func(mapping *meta.RESTMapping) (RESTClientPoster, error) { fakeCodec := runtime.CodecFor(api.Scheme, "v1beta1") fakeUri, _ := url.Parse(server.URL + "/api/v1beta1") - return client.NewRESTClient(fakeUri, "v1beta1", fakeCodec, true), nil + return client.NewRESTClient(fakeUri, "v1beta1", fakeCodec, true, 0), nil }, server }