Skip to content

Commit

Permalink
Refactor throttle into util pkg
Browse files Browse the repository at this point in the history
Fix missing throttle.go
  • Loading branch information
resouer committed Mar 25, 2016
1 parent dae5ac4 commit 8472cfa
Show file tree
Hide file tree
Showing 15 changed files with 51 additions and 49 deletions.
3 changes: 2 additions & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/util/flowcontrol"
utilnet "k8s.io/kubernetes/pkg/util/net"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
Expand Down Expand Up @@ -201,7 +202,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
go replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096).
Run(3, wait.NeverStop)

nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(),
nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
nodeController.Run(5 * time.Second)
cadvisorInterface := new(cadvisortest.Fake)
Expand Down
6 changes: 3 additions & 3 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ import (
"k8s.io/kubernetes/pkg/healthz"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"

"github.com/golang/glog"
Expand Down Expand Up @@ -207,8 +207,8 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
// this cidr has been validated already
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration)

Expand Down
6 changes: 3 additions & 3 deletions contrib/mesos/pkg/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ import (
"k8s.io/kubernetes/pkg/healthz"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"

"k8s.io/kubernetes/contrib/mesos/pkg/profile"
Expand Down Expand Up @@ -154,8 +154,8 @@ func (s *CMServer) Run(_ []string) error {
}
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration)

Expand Down
7 changes: 4 additions & 3 deletions pkg/client/restclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)

const (
Expand All @@ -53,7 +54,7 @@ type RESTClient struct {
contentConfig ContentConfig

// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
Throttle util.RateLimiter
Throttle flowcontrol.RateLimiter

// Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client
Expand All @@ -77,9 +78,9 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
config.ContentType = "application/json"
}

var throttle util.RateLimiter
var throttle flowcontrol.RateLimiter
if maxQPS > 0 {
throttle = util.NewTokenBucketRateLimiter(maxQPS, maxBurst)
throttle = flowcontrol.NewTokenBucketRateLimiter(maxQPS, maxBurst)
}
return &RESTClient{
base: &base,
Expand Down
6 changes: 3 additions & 3 deletions pkg/client/restclient/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
Expand Down Expand Up @@ -117,11 +117,11 @@ type Request struct {
resp *http.Response

backoffMgr BackoffManager
throttle util.RateLimiter
throttle flowcontrol.RateLimiter
}

// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager, throttle util.RateLimiter) *Request {
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request {
if backoff == nil {
glog.V(2).Infof("Not implementing request backoff strategy.")
backoff = &NoBackoff{}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cloudprovider/providers/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flowcontrol"
netsets "k8s.io/kubernetes/pkg/util/net/sets"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
Expand Down Expand Up @@ -80,7 +80,7 @@ type GCECloud struct {
managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master)
networkURL string
useMetadataServer bool
operationPollRateLimiter util.RateLimiter
operationPollRateLimiter flowcontrol.RateLimiter
}

type Config struct {
Expand Down Expand Up @@ -297,7 +297,7 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
glog.Infof("managing multiple zones: %v", managedZones)
}

operationPollRateLimiter := util.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size.
operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size.

return &GCECloud{
service: svc,
Expand Down
6 changes: 3 additions & 3 deletions pkg/cloudprovider/providers/gce/token_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"strings"
"time"

"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/oauth2"
Expand Down Expand Up @@ -61,7 +61,7 @@ type altTokenSource struct {
oauthClient *http.Client
tokenURL string
tokenBody string
throttle util.RateLimiter
throttle flowcontrol.RateLimiter
}

func (a *altTokenSource) Token() (*oauth2.Token, error) {
Expand Down Expand Up @@ -106,7 +106,7 @@ func newAltTokenSource(tokenURL, tokenBody string) oauth2.TokenSource {
oauthClient: client,
tokenURL: tokenURL,
tokenBody: tokenBody,
throttle: util.NewTokenBucketRateLimiter(tokenURLQPS, tokenURLBurst),
throttle: flowcontrol.NewTokenBucketRateLimiter(tokenURLQPS, tokenURLBurst),
}
return oauth2.ReuseTokenSource(nil, a)
}
8 changes: 4 additions & 4 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
Expand Down Expand Up @@ -69,7 +69,7 @@ type NodeController struct {
allocateNodeCIDRs bool
cloud cloudprovider.Interface
clusterCIDR *net.IPNet
deletingPodsRateLimiter util.RateLimiter
deletingPodsRateLimiter flowcontrol.RateLimiter
knownNodeSet sets.String
kubeClient clientset.Interface
// Method for easy mocking in unittest.
Expand Down Expand Up @@ -129,8 +129,8 @@ func NewNodeController(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
deletionEvictionLimiter util.RateLimiter,
terminationEvictionLimiter util.RateLimiter,
deletionEvictionLimiter flowcontrol.RateLimiter,
terminationEvictionLimiter flowcontrol.RateLimiter,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/node/nodecontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {

for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod,
evictionTimeout, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
for _, ds := range item.daemonSets {
Expand Down Expand Up @@ -487,7 +487,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
deleteWaitChan: make(chan struct{}),
}
nodeController := NewNodeController(nil, fnh, 10*time.Minute,
util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(),
flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, false)
nodeController.cloud = &fakecloud.FakeCloud{}
Expand Down Expand Up @@ -720,8 +720,8 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}

for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(),
util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(),
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -870,8 +870,8 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
}

for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(),
util.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(),
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err)
Expand Down Expand Up @@ -952,7 +952,7 @@ func TestNodeDeletion(t *testing.T) {
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
}

nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(),
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/node/rate_limited_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"time"

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets"
)

Expand Down Expand Up @@ -137,11 +137,11 @@ func (q *UniqueQueue) Head() (TimedValue, bool) {
// of execution. It is also rate limited.
type RateLimitedTimedQueue struct {
queue UniqueQueue
limiter util.RateLimiter
limiter flowcontrol.RateLimiter
}

// Creates new queue which will use given RateLimiter to oversee execution.
func NewRateLimitedTimedQueue(limiter util.RateLimiter) *RateLimitedTimedQueue {
func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimedQueue {
return &RateLimitedTimedQueue{
queue: UniqueQueue{
queue: TimedQueue{},
Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/node/rate_limited_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"testing"
"time"

"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets"
)

Expand All @@ -39,7 +39,7 @@ func CheckSetEq(lhs, rhs sets.String) bool {
}

func TestAddNode(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
Expand All @@ -62,7 +62,7 @@ func TestAddNode(t *testing.T) {
}

func TestDelNode(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
Expand All @@ -84,7 +84,7 @@ func TestDelNode(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
}

evictor = NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
Expand All @@ -106,7 +106,7 @@ func TestDelNode(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
}

evictor = NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
Expand All @@ -130,7 +130,7 @@ func TestDelNode(t *testing.T) {
}

func TestTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
Expand All @@ -152,7 +152,7 @@ func TestTry(t *testing.T) {
}

func TestTryOrdering(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestTryOrdering(t *testing.T) {
}

func TestTryRemovingWhileTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeAlwaysRateLimiter())
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
Expand Down
6 changes: 3 additions & 3 deletions pkg/kubelet/dockertools/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/leaky"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/parsers"
)

Expand Down Expand Up @@ -107,7 +107,7 @@ type dockerPuller struct {

type throttledDockerPuller struct {
puller dockerPuller
limiter util.RateLimiter
limiter flowcontrol.RateLimiter
}

// newDockerPuller creates a new instance of the default implementation of DockerPuller.
Expand All @@ -122,7 +122,7 @@ func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPulle
}
return &throttledDockerPuller{
puller: dp,
limiter: util.NewTokenBucketRateLimiter(qps, burst),
limiter: flowcontrol.NewTokenBucketRateLimiter(qps, burst),
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/throttle.go → pkg/util/flowcontrol/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package util
package flowcontrol

import (
"sync"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package util
package flowcontrol

import (
"math"
Expand Down
Loading

0 comments on commit 8472cfa

Please sign in to comment.