Skip to content

Commit

Permalink
All clients under ClientSet share one RateLimiter.
Browse files Browse the repository at this point in the history
  • Loading branch information
gmarek committed Apr 21, 2016
1 parent 8a8177f commit b76bed0
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 14 deletions.
11 changes: 8 additions & 3 deletions cmd/libs/go2idl/client-gen/generators/generator_for_clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ func (g *genClientset) Imports(c *generator.Context) (imports []string) {
typedClientPath := filepath.Join(g.typedClientPath, group, version)
group = normalization.BeforeFirstDot(group)
imports = append(imports, fmt.Sprintf("%s%s \"%s\"", version, group, typedClientPath))
imports = append(imports, "github.com/golang/glog")
}
imports = append(imports, "github.com/golang/glog")
imports = append(imports, "k8s.io/kubernetes/pkg/util/flowcontrol")
return
}

Expand Down Expand Up @@ -143,14 +144,18 @@ func (c *Clientset) Discovery() $.DiscoveryInterface|raw$ {
var newClientsetForConfigTemplate = `
// NewForConfig creates a new Clientset for the given config.
func NewForConfig(c *$.Config|raw$) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var clientset Clientset
var err error
$range .allGroups$ clientset.$.Group$Client, err =$.PackageName$.NewForConfig(c)
$range .allGroups$ clientset.$.Group$Client, err =$.PackageName$.NewForConfig(&configShallowCopy)
if err!=nil {
return &clientset, err
}
$end$
clientset.DiscoveryClient, err = $.NewDiscoveryClientForConfig|raw$(c)
clientset.DiscoveryClient, err = $.NewDiscoveryClientForConfig|raw$(&configShallowCopy)
if err!=nil {
glog.Errorf("failed to create the DiscoveryClient: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
unversionedtestgroup "k8s.io/kubernetes/cmd/libs/go2idl/client-gen/testoutput/clientset_generated/test_internalclientset/typed/testgroup.k8s.io/unversioned"
restclient "k8s.io/kubernetes/pkg/client/restclient"
discovery "k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)

type Interface interface {
Expand All @@ -47,14 +48,18 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface {

// NewForConfig creates a new Clientset for the given config.
func NewForConfig(c *restclient.Config) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var clientset Clientset
var err error
clientset.TestgroupClient, err = unversionedtestgroup.NewForConfig(c)
clientset.TestgroupClient, err = unversionedtestgroup.NewForConfig(&configShallowCopy)
if err != nil {
return &clientset, err
}

clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(c)
clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy)
if err != nil {
glog.Errorf("failed to create the DiscoveryClient: %v", err)
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/client/clientset_generated/internalclientset/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
restclient "k8s.io/kubernetes/pkg/client/restclient"
discovery "k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)

type Interface interface {
Expand Down Expand Up @@ -55,18 +56,22 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface {

// NewForConfig creates a new Clientset for the given config.
func NewForConfig(c *restclient.Config) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var clientset Clientset
var err error
clientset.CoreClient, err = unversionedcore.NewForConfig(c)
clientset.CoreClient, err = unversionedcore.NewForConfig(&configShallowCopy)
if err != nil {
return &clientset, err
}
clientset.ExtensionsClient, err = unversionedextensions.NewForConfig(c)
clientset.ExtensionsClient, err = unversionedextensions.NewForConfig(&configShallowCopy)
if err != nil {
return &clientset, err
}

clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(c)
clientset.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy)
if err != nil {
glog.Errorf("failed to create the DiscoveryClient: %v", err)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/client/restclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type RESTClient struct {
// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
// decoding of responses from the server.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, client *http.Client) *RESTClient {
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) *RESTClient {
base := *baseURL
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
Expand All @@ -79,8 +79,10 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
}

var throttle flowcontrol.RateLimiter
if maxQPS > 0 {
if maxQPS > 0 && rateLimiter == nil {
throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)
} else if rateLimiter != nil {
throttle = rateLimiter
}
return &RESTClient{
base: &base,
Expand Down
8 changes: 6 additions & 2 deletions pkg/client/restclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/version"
)

Expand Down Expand Up @@ -90,6 +91,9 @@ type Config struct {

// Maximum burst for throttle
Burst int

// Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst
RateLimiter flowcontrol.RateLimiter
}

// TLSClientConfig contains settings to enable transport layer security
Expand Down Expand Up @@ -155,7 +159,7 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
httpClient = &http.Client{Transport: transport}
}

client := NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, httpClient)
client := NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)

return client, nil
}
Expand Down Expand Up @@ -188,7 +192,7 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
versionConfig.GroupVersion = &v
}

client := NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, httpClient)
client := NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
return client, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/client/restclient/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,5 +1319,5 @@ func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient {
}
}
versionedAPIPath := testapi.Default.ResourcePath("", "", "")
return NewRESTClient(baseURL, versionedAPIPath, ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, 0, 0, nil)
return NewRESTClient(baseURL, versionedAPIPath, ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, 0, 0, nil, nil)
}
2 changes: 1 addition & 1 deletion pkg/client/unversioned/remotecommand/remotecommand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestStream(t *testing.T) {
server := httptest.NewServer(fakeServer(t, name, exec, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty, testCase.MessageCount, testCase.ServerProtocols))

url, _ := url.ParseRequestURI(server.URL)
c := restclient.NewRESTClient(url, "", restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "x"}}, -1, -1, nil)
c := restclient.NewRESTClient(url, "", restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "x"}}, -1, -1, nil, nil)
req := c.Post().Resource("testing")

if exec {
Expand Down

1 comment on commit b76bed0

@k8s-teamcity-mesosphere

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity OSS :: Kubernetes Mesos :: 4 - Smoke Tests Build 22177 outcome was FAILURE
Summary: Tests failed: 1 (1 new), passed: 0 Build time: 00:27:49

Failed tests

null: Kubernetes e2e suite.BeforeSuite: <no details avaliable>

Please sign in to comment.