From 8472cfa214369f69f2dc680962be63ba34199bd2 Mon Sep 17 00:00:00 2001 From: harry Date: Wed, 9 Mar 2016 13:54:59 +0800 Subject: [PATCH] Refactor throttle into util pkg Fix missing throttle.go --- cmd/integration/integration.go | 3 ++- .../app/controllermanager.go | 6 +++--- .../pkg/controllermanager/controllermanager.go | 6 +++--- pkg/client/restclient/client.go | 7 ++++--- pkg/client/restclient/request.go | 6 +++--- pkg/cloudprovider/providers/gce/gce.go | 6 +++--- pkg/cloudprovider/providers/gce/token_source.go | 6 +++--- pkg/controller/node/nodecontroller.go | 8 ++++---- pkg/controller/node/nodecontroller_test.go | 16 ++++++++-------- pkg/controller/node/rate_limited_queue.go | 6 +++--- pkg/controller/node/rate_limited_queue_test.go | 16 ++++++++-------- pkg/kubelet/dockertools/docker.go | 6 +++--- pkg/util/{ => flowcontrol}/throttle.go | 2 +- pkg/util/{ => flowcontrol}/throttle_test.go | 2 +- test/e2e/service_latency.go | 4 ++-- 15 files changed, 51 insertions(+), 49 deletions(-) rename pkg/util/{ => flowcontrol}/throttle.go (99%) rename pkg/util/{ => flowcontrol}/throttle_test.go (99%) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 2d95fc13c0ccd..b49354ee8a9a1 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -56,6 +56,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flag" + "k8s.io/kubernetes/pkg/util/flowcontrol" utilnet "k8s.io/kubernetes/pkg/util/net" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" @@ -201,7 +202,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string go replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096). Run(3, wait.NeverStop) - nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), + nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(5 * time.Second) cadvisorInterface := new(cadvisortest.Fake) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 9824c58b97c3d..22a483cae11ef 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -64,9 +64,9 @@ import ( "k8s.io/kubernetes/pkg/healthz" quotainstall "k8s.io/kubernetes/pkg/quota/install" "k8s.io/kubernetes/pkg/serviceaccount" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/crypto" + "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/wait" "github.com/golang/glog" @@ -207,8 +207,8 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig // this cidr has been validated already _, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR) nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), - s.PodEvictionTimeout.Duration, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), - util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), + s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), + flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod.Duration) diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 749f624f533a8..af9aa27711a64 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -58,8 +58,8 @@ import ( "k8s.io/kubernetes/pkg/healthz" quotainstall "k8s.io/kubernetes/pkg/quota/install" "k8s.io/kubernetes/pkg/serviceaccount" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/crypto" + "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/contrib/mesos/pkg/profile" @@ -154,8 +154,8 @@ func (s *CMServer) Run(_ []string) error { } _, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR) nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), - s.PodEvictionTimeout.Duration, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), - util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), + s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), + flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod.Duration) diff --git a/pkg/client/restclient/client.go b/pkg/client/restclient/client.go index 2e378a9225b09..ba6e009cc1839 100644 --- a/pkg/client/restclient/client.go +++ b/pkg/client/restclient/client.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/flowcontrol" ) const ( @@ -53,7 +54,7 @@ type RESTClient struct { contentConfig ContentConfig // TODO extract this into a wrapper interface via the RESTClient interface in kubectl. - Throttle util.RateLimiter + Throttle flowcontrol.RateLimiter // Set specific behavior of the client. If not set http.DefaultClient will be used. Client *http.Client @@ -77,9 +78,9 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf config.ContentType = "application/json" } - var throttle util.RateLimiter + var throttle flowcontrol.RateLimiter if maxQPS > 0 { - throttle = util.NewTokenBucketRateLimiter(maxQPS, maxBurst) + throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst) } return &RESTClient{ base: &base, diff --git a/pkg/client/restclient/request.go b/pkg/client/restclient/request.go index ce30762b1040e..959318d7f6a57 100644 --- a/pkg/client/restclient/request.go +++ b/pkg/client/restclient/request.go @@ -39,7 +39,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/watch" @@ -117,11 +117,11 @@ type Request struct { resp *http.Response backoffMgr BackoffManager - throttle util.RateLimiter + throttle flowcontrol.RateLimiter } // NewRequest creates a new request helper object for accessing runtime.Objects on a server. -func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager, throttle util.RateLimiter) *Request { +func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request { if backoff == nil { glog.V(2).Infof("Not implementing request backoff strategy.") backoff = &NoBackoff{} diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index a00b17b60c70d..295a6530a1f04 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -33,8 +33,8 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" utilerrors "k8s.io/kubernetes/pkg/util/errors" + "k8s.io/kubernetes/pkg/util/flowcontrol" netsets "k8s.io/kubernetes/pkg/util/net/sets" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -80,7 +80,7 @@ type GCECloud struct { managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master) networkURL string useMetadataServer bool - operationPollRateLimiter util.RateLimiter + operationPollRateLimiter flowcontrol.RateLimiter } type Config struct { @@ -297,7 +297,7 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo glog.Infof("managing multiple zones: %v", managedZones) } - operationPollRateLimiter := util.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size. + operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size. return &GCECloud{ service: svc, diff --git a/pkg/cloudprovider/providers/gce/token_source.go b/pkg/cloudprovider/providers/gce/token_source.go index f8a0e3671e28f..70f3c987dcac0 100644 --- a/pkg/cloudprovider/providers/gce/token_source.go +++ b/pkg/cloudprovider/providers/gce/token_source.go @@ -22,7 +22,7 @@ import ( "strings" "time" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/flowcontrol" "github.com/prometheus/client_golang/prometheus" "golang.org/x/oauth2" @@ -61,7 +61,7 @@ type altTokenSource struct { oauthClient *http.Client tokenURL string tokenBody string - throttle util.RateLimiter + throttle flowcontrol.RateLimiter } func (a *altTokenSource) Token() (*oauth2.Token, error) { @@ -106,7 +106,7 @@ func newAltTokenSource(tokenURL, tokenBody string) oauth2.TokenSource { oauthClient: client, tokenURL: tokenURL, tokenBody: tokenBody, - throttle: util.NewTokenBucketRateLimiter(tokenURLQPS, tokenURLBurst), + throttle: flowcontrol.NewTokenBucketRateLimiter(tokenURLQPS, tokenURLBurst), } return oauth2.ReuseTokenSource(nil, a) } diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index aa88a3459d9ec..92c7288d19ec0 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -40,7 +40,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/flowcontrol" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -69,7 +69,7 @@ type NodeController struct { allocateNodeCIDRs bool cloud cloudprovider.Interface clusterCIDR *net.IPNet - deletingPodsRateLimiter util.RateLimiter + deletingPodsRateLimiter flowcontrol.RateLimiter knownNodeSet sets.String kubeClient clientset.Interface // Method for easy mocking in unittest. @@ -129,8 +129,8 @@ func NewNodeController( cloud cloudprovider.Interface, kubeClient clientset.Interface, podEvictionTimeout time.Duration, - deletionEvictionLimiter util.RateLimiter, - terminationEvictionLimiter util.RateLimiter, + deletionEvictionLimiter flowcontrol.RateLimiter, + terminationEvictionLimiter flowcontrol.RateLimiter, nodeMonitorGracePeriod time.Duration, nodeStartupGracePeriod time.Duration, nodeMonitorPeriod time.Duration, diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 0b69e5679bd54..938aa9dbbc9e3 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -31,8 +31,8 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/diff" + "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -418,7 +418,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, item.fakeNodeHandler, - evictionTimeout, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, + evictionTimeout, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() unversioned.Time { return fakeNow } for _, ds := range item.daemonSets { @@ -487,7 +487,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) { deleteWaitChan: make(chan struct{}), } nodeController := NewNodeController(nil, fnh, 10*time.Minute, - util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), + flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.cloud = &fakecloud.FakeCloud{} @@ -720,8 +720,8 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for i, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), - util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) + nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), + flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -870,8 +870,8 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) { } for i, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), - util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) + nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), + flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("Case[%d] unexpected error: %v", i, err) @@ -952,7 +952,7 @@ func TestNodeDeletion(t *testing.T) { Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), } - nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), + nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() unversioned.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { diff --git a/pkg/controller/node/rate_limited_queue.go b/pkg/controller/node/rate_limited_queue.go index e934d4b4ae0b9..4b8042acecc66 100644 --- a/pkg/controller/node/rate_limited_queue.go +++ b/pkg/controller/node/rate_limited_queue.go @@ -22,7 +22,7 @@ import ( "time" "github.com/golang/glog" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/sets" ) @@ -137,11 +137,11 @@ func (q *UniqueQueue) Head() (TimedValue, bool) { // of execution. It is also rate limited. type RateLimitedTimedQueue struct { queue UniqueQueue - limiter util.RateLimiter + limiter flowcontrol.RateLimiter } // Creates new queue which will use given RateLimiter to oversee execution. -func NewRateLimitedTimedQueue(limiter util.RateLimiter) *RateLimitedTimedQueue { +func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimedQueue { return &RateLimitedTimedQueue{ queue: UniqueQueue{ queue: TimedQueue{}, diff --git a/pkg/controller/node/rate_limited_queue_test.go b/pkg/controller/node/rate_limited_queue_test.go index a04711466ff7f..56b7cff013ff1 100644 --- a/pkg/controller/node/rate_limited_queue_test.go +++ b/pkg/controller/node/rate_limited_queue_test.go @@ -21,7 +21,7 @@ import ( "testing" "time" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/sets" ) @@ -39,7 +39,7 @@ func CheckSetEq(lhs, rhs sets.String) bool { } func TestAddNode(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) + evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -62,7 +62,7 @@ func TestAddNode(t *testing.T) { } func TestDelNode(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) + evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -84,7 +84,7 @@ func TestDelNode(t *testing.T) { t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) } - evictor = NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) + evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -106,7 +106,7 @@ func TestDelNode(t *testing.T) { t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) } - evictor = NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) + evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -130,7 +130,7 @@ func TestDelNode(t *testing.T) { } func TestTry(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) + evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -152,7 +152,7 @@ func TestTry(t *testing.T) { } func TestTryOrdering(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) + evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -184,7 +184,7 @@ func TestTryOrdering(t *testing.T) { } func TestTryRemovingWhileTry(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter()) + evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index da6ebd8372daa..c41bcc1a95e2d 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -32,8 +32,8 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/leaky" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" utilerrors "k8s.io/kubernetes/pkg/util/errors" + "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/parsers" ) @@ -107,7 +107,7 @@ type dockerPuller struct { type throttledDockerPuller struct { puller dockerPuller - limiter util.RateLimiter + limiter flowcontrol.RateLimiter } // newDockerPuller creates a new instance of the default implementation of DockerPuller. @@ -122,7 +122,7 @@ func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPulle } return &throttledDockerPuller{ puller: dp, - limiter: util.NewTokenBucketRateLimiter(qps, burst), + limiter: flowcontrol.NewTokenBucketRateLimiter(qps, burst), } } diff --git a/pkg/util/throttle.go b/pkg/util/flowcontrol/throttle.go similarity index 99% rename from pkg/util/throttle.go rename to pkg/util/flowcontrol/throttle.go index c1caea099fee3..a63817ca8c104 100644 --- a/pkg/util/throttle.go +++ b/pkg/util/flowcontrol/throttle.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package flowcontrol import ( "sync" diff --git a/pkg/util/throttle_test.go b/pkg/util/flowcontrol/throttle_test.go similarity index 99% rename from pkg/util/throttle_test.go rename to pkg/util/flowcontrol/throttle_test.go index ca0e9ac234344..30b792ec0bb4b 100644 --- a/pkg/util/throttle_test.go +++ b/pkg/util/flowcontrol/throttle_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package flowcontrol import ( "math" diff --git a/test/e2e/service_latency.go b/test/e2e/service_latency.go index b43145680a70f..83c229974c453 100644 --- a/test/e2e/service_latency.go +++ b/test/e2e/service_latency.go @@ -26,7 +26,7 @@ import ( "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/watch" @@ -66,7 +66,7 @@ var _ = KubeDescribe("Service endpoints latency", func() { // Turn off rate limiting--it interferes with our measurements. oldThrottle := f.Client.RESTClient.Throttle - f.Client.RESTClient.Throttle = util.NewFakeAlwaysRateLimiter() + f.Client.RESTClient.Throttle = flowcontrol.NewFakeAlwaysRateLimiter() defer func() { f.Client.RESTClient.Throttle = oldThrottle }() failing := sets.NewString()