Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a QPS limiter to the kubernetes client. #6203

Merged
merged 1 commit into from
Apr 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/for-tests/network-tester/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func contactOthers(state *State) {
Host: os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"),
Path: "/api/v1beta1",
}
client := &client.Client{client.NewRESTClient(&masterRO, "v1beta1", latest.Codec, true)}
client := &client.Client{client.NewRESTClient(&masterRO, "v1beta1", latest.Codec, true, 0)}

// Do this repeatedly, in case there's some propagation delay with getting
// newly started pods into the endpoints list.
Expand Down
4 changes: 2 additions & 2 deletions hack/test-cmd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ __EOF__
# Pre-condition: frontend replication controller is running
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" 'frontend-controller:'
# Command
kubectl delete rc frontend-controller "${kube_flags[@]}"
kubectl stop rc frontend-controller "${kube_flags[@]}"
# Post-condition: no replication controller is running
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" ''

Expand All @@ -520,7 +520,7 @@ __EOF__
# Pre-condition: frontend and redis-slave
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" 'frontend-controller:redis-slave-controller:'
# Command
kubectl delete rc frontend-controller redis-slave-controller "${kube_flags[@]}" # delete multiple controllers at once
kubectl stop rc frontend-controller redis-slave-controller "${kube_flags[@]}" # delete multiple controllers at once
# Post-condition: no replication controller is running
kube::test::get_object_assert rc "{{range.items}}{{$id_field}}:{{end}}" ''

Expand Down
8 changes: 7 additions & 1 deletion pkg/client/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type Config struct {
// Transport may be used for custom HTTP behavior. This attribute may not
// be specified with the TLS client certificate options.
Transport http.RoundTripper

// QPS indicates the maximum QPS to the master from this client. If zero, QPS is unlimited.
QPS float32
}

type KubeletConfig struct {
Expand Down Expand Up @@ -175,6 +178,9 @@ func SetKubernetesDefaults(config *Config) error {
config.Codec = versionInterfaces.Codec
}
config.LegacyBehavior = (version == "v1beta1" || version == "v1beta2")
if config.QPS == 0.0 {
config.QPS = 5.0
}
return nil
}

Expand All @@ -195,7 +201,7 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
return nil, err
}

client := NewRESTClient(baseURL, config.Version, config.Codec, config.LegacyBehavior)
client := NewRESTClient(baseURL, config.Version, config.Codec, config.LegacyBehavior, config.QPS)

transport, err := TransportFor(config)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/client/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func TestSetKubernetesDefaults(t *testing.T) {
Version: latest.Version,
Codec: latest.Codec,
LegacyBehavior: (latest.Version == "v1beta1" || latest.Version == "v1beta2"),
QPS: 5,
},
false,
},
Expand Down
20 changes: 14 additions & 6 deletions pkg/client/restclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

// RESTClient imposes common Kubernetes API conventions on a set of resource paths.
Expand Down Expand Up @@ -50,27 +51,36 @@ type RESTClient struct {
Client HTTPClient

Timeout time.Duration

// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
Throttle util.RateLimiter
}

// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
// decoding of responses from the server. If this client should use the older, legacy
// API conventions from Kubernetes API v1beta1 and v1beta2, set legacyBehavior true.
func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyBehavior bool) *RESTClient {
func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyBehavior bool, maxQPS float32) *RESTClient {
base := *baseURL
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
}
base.RawQuery = ""
base.Fragment = ""

var throttle util.RateLimiter
if maxQPS > 0 {
throttle = util.NewTokenBucketRateLimiter(maxQPS, 10)
}
return &RESTClient{
baseURL: &base,
apiVersion: apiVersion,

Codec: c,

LegacyBehavior: legacyBehavior,

Throttle: throttle,
}
}

Expand All @@ -87,11 +97,9 @@ func NewRESTClient(baseURL *url.URL, apiVersion string, c runtime.Codec, legacyB
// list, ok := resp.(*api.PodList)
//
func (c *RESTClient) Verb(verb string) *Request {
// TODO: uncomment when Go 1.2 support is dropped
//var timeout time.Duration = 0
// if c.Client != nil {
// timeout = c.Client.Timeout
// }
if c.Throttle != nil {
c.Throttle.Accept()
}
return NewRequest(c.Client, verb, c.baseURL, c.apiVersion, c.Codec, c.LegacyBehavior, c.LegacyBehavior).Timeout(c.Timeout)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func getFakeClient(t *testing.T, validURLs []string) (ClientPosterFunc, *httptes
return func(mapping *meta.RESTMapping) (RESTClientPoster, error) {
fakeCodec := runtime.CodecFor(api.Scheme, "v1beta1")
fakeUri, _ := url.Parse(server.URL + "/api/v1beta1")
return client.NewRESTClient(fakeUri, "v1beta1", fakeCodec, true), nil
return client.NewRESTClient(fakeUri, "v1beta1", fakeCodec, true, 0), nil
}, server
}

Expand Down