Skip to content

Commit

Permalink
Allow switching rate limiter inside RateLimitedQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
gmarek committed Jul 14, 2016
1 parent c949765 commit f6b1c31
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 20 deletions.
5 changes: 3 additions & 2 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,9 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
}
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration,
s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR,
int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

Expand Down
3 changes: 2 additions & 1 deletion cmd/kube-controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.DeploymentControllerSyncPeriod.Duration, "deployment-controller-sync-period", s.DeploymentControllerSyncPeriod.Duration, "Period for syncing the deployments.")
fs.DurationVar(&s.PodEvictionTimeout.Duration, "pod-eviction-timeout", s.PodEvictionTimeout.Duration, "The grace period for deleting pods on failed nodes.")
fs.Float32Var(&s.DeletingPodsQps, "deleting-pods-qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.")
fs.Int32Var(&s.DeletingPodsBurst, "deleting-pods-burst", 1, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.")
fs.Int32Var(&s.DeletingPodsBurst, "deleting-pods-burst", 0, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.")
fs.MarkDeprecated("deleting-pods-burst", "This flag is currently no-op and will be deleted.")
fs.Int32Var(&s.RegisterRetryCount, "register-retry-count", s.RegisterRetryCount, ""+
"The number of retries for initial node registration. Retry interval equals node-sync-period.")
fs.MarkDeprecated("register-retry-count", "This flag is currently no-op and will be deleted.")
Expand Down
2 changes: 1 addition & 1 deletion contrib/mesos/pkg/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *CMServer) Run(_ []string) error {
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
_, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps,
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration)

Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/componentconfig/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ type KubeControllerManagerConfiguration struct {
// deletingPodsQps is the number of nodes per second on which pods are deleted in
// case of node failure.
DeletingPodsQps float32 `json:"deletingPodsQps"`
// deletingPodsBurst is the number of nodes on which pods are bursty deleted in
// DEPRECATED: deletingPodsBurst is the number of nodes on which pods are bursty deleted in
// case of node failure. For more details look into RateLimiter.
DeletingPodsBurst int32 `json:"deletingPodsBurst"`
// nodeMontiorGracePeriod is the amount of time which we allow a running node to be
Expand Down
9 changes: 4 additions & 5 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const (
nodeStatusUpdateRetry = 5
// controls how often NodeController will try to evict Pods from non-responsive Nodes.
nodeEvictionPeriod = 100 * time.Millisecond
// Burst value for all eviction rate limiters
evictionRateLimiterBurst = 1
)

type zoneState string
Expand Down Expand Up @@ -114,7 +116,6 @@ type NodeController struct {
zonePodEvictor map[string]*RateLimitedTimedQueue
zoneTerminationEvictor map[string]*RateLimitedTimedQueue
evictionLimiterQPS float32
evictionLimiterBurst int
podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
Expand Down Expand Up @@ -144,7 +145,6 @@ func NewNodeController(
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
evictionLimiterBurst int,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
Expand Down Expand Up @@ -200,7 +200,6 @@ func NewNodeController(
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
computeZoneStateFunc: ComputeZoneState,
evictionLimiterQPS: evictionLimiterQPS,
evictionLimiterBurst: evictionLimiterBurst,
zoneStates: make(map[string]zoneState),
}

Expand Down Expand Up @@ -385,11 +384,11 @@ func (nc *NodeController) monitorNodeStatus() error {
if _, found := nc.zonePodEvictor[zone]; !found {
nc.zonePodEvictor[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst))
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
}
if _, found := nc.zoneTerminationEvictor[zone]; !found {
nc.zoneTerminationEvictor[zone] = NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst))
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
}
nc.cancelPodEviction(added[i])
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/node/nodecontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {

for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, testRateLimiterQPS, testRateLimiterBurst, testNodeMonitorGracePeriod,
evictionTimeout, testRateLimiterQPS, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
for _, ds := range item.daemonSets {
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
deleteWaitChan: make(chan struct{}),
}
nodeController := NewNodeController(nil, fnh, 10*time.Minute,
testRateLimiterQPS, testRateLimiterBurst,
testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.cloud = &fakecloud.FakeCloud{}
Expand Down Expand Up @@ -907,7 +907,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}

for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst,
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
Expand Down Expand Up @@ -1057,7 +1057,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
}

for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst,
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
Expand Down Expand Up @@ -1139,7 +1139,7 @@ func TestNodeDeletion(t *testing.T) {
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
}

nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst,
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func TestCheckPod(t *testing.T) {
},
}

nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
nc.nodeStore.Store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{
Expand Down Expand Up @@ -1310,7 +1310,7 @@ func TestCleanupOrphanedPods(t *testing.T) {
newPod("b", "bar"),
newPod("c", "gone"),
}
nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)

nc.nodeStore.Store.Add(newNode("foo"))
nc.nodeStore.Store.Add(newNode("bar"))
Expand Down
39 changes: 37 additions & 2 deletions pkg/controller/node/rate_limited_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ func (q *UniqueQueue) Clear() {
// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time
// of execution. It is also rate limited.
type RateLimitedTimedQueue struct {
queue UniqueQueue
limiter flowcontrol.RateLimiter
queue UniqueQueue
limiterLock sync.Mutex
limiter flowcontrol.RateLimiter
}

// Creates new queue which will use given RateLimiter to oversee execution.
Expand All @@ -173,6 +174,8 @@ type ActionFunc func(TimedValue) (bool, time.Duration)
// time to execute the next item in the queue.
func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
val, ok := q.queue.Head()
q.limiterLock.Lock()
defer q.limiterLock.Unlock()
for ok {
// rate limit the queue checking
if !q.limiter.TryAccept() {
Expand Down Expand Up @@ -216,3 +219,35 @@ func (q *RateLimitedTimedQueue) Remove(value string) bool {
func (q *RateLimitedTimedQueue) Clear() {
q.queue.Clear()
}

// SwapLimiter safely swaps current limiter for this queue with the passed one if capacities or qps's differ.
func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) {
q.limiterLock.Lock()
defer q.limiterLock.Unlock()
if q.limiter.QPS() == newQPS {
return
}
var newLimiter flowcontrol.RateLimiter
if newQPS <= 0 {
newLimiter = flowcontrol.NewFakeNeverRateLimiter()
} else {
newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, evictionRateLimiterBurst)
}
// If we're currently waiting on limiter, we drain the new one - this is a good approach when Burst value is 1
// TODO: figure out if we need to support higher Burst values and decide on the drain logic, should we keep:
// - saturation (percentage of used tokens)
// - number of used tokens
// - number of available tokens
// - something else
for q.limiter.Saturation() > newLimiter.Saturation() {
// Check if we're not using fake limiter
previousSaturation := newLimiter.Saturation()
newLimiter.TryAccept()
// It's a fake limiter
if newLimiter.Saturation() == previousSaturation {
break
}
}
q.limiter.Stop()
q.limiter = newLimiter
}
24 changes: 24 additions & 0 deletions pkg/controller/node/rate_limited_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,27 @@ func TestClear(t *testing.T) {
t.Fatalf("Clear should remove all elements from the queue.")
}
}

func TestSwapLimiter(t *testing.T) {
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
fakeAlways := flowcontrol.NewFakeAlwaysRateLimiter()
qps := evictor.limiter.QPS()
if qps != fakeAlways.QPS() {
t.Fatalf("QPS does not match create one: %v instead of %v", qps, fakeAlways.QPS())
}

evictor.SwapLimiter(0)
qps = evictor.limiter.QPS()
fakeNever := flowcontrol.NewFakeNeverRateLimiter()
if qps != fakeNever.QPS() {
t.Fatalf("QPS does not match create one: %v instead of %v", qps, fakeNever.QPS())
}

createdQPS := float32(5.5)
evictor.SwapLimiter(createdQPS)
qps = evictor.limiter.QPS()
if qps != createdQPS {
t.Fatalf("QPS does not match create one: %v instead of %v", qps, createdQPS)
}

}
20 changes: 19 additions & 1 deletion pkg/util/flowcontrol/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ type RateLimiter interface {
// Usually we use token bucket rate limiter. In that case,
// 1.0 means no tokens are available; 0.0 means we have a full bucket of tokens to use.
Saturation() float64
// QPS returns QPS of this rate limiter
QPS() float32
}

type tokenBucketRateLimiter struct {
limiter *ratelimit.Bucket
qps float32
}

// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
Expand All @@ -48,7 +51,10 @@ type tokenBucketRateLimiter struct {
// The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst))
return &tokenBucketRateLimiter{limiter}
return &tokenBucketRateLimiter{
limiter: limiter,
qps: qps,
}
}

func (t *tokenBucketRateLimiter) TryAccept() bool {
Expand All @@ -69,6 +75,10 @@ func (t *tokenBucketRateLimiter) Accept() {
func (t *tokenBucketRateLimiter) Stop() {
}

func (t *tokenBucketRateLimiter) QPS() float32 {
return t.qps
}

type fakeAlwaysRateLimiter struct{}

func NewFakeAlwaysRateLimiter() RateLimiter {
Expand All @@ -87,6 +97,10 @@ func (t *fakeAlwaysRateLimiter) Stop() {}

func (t *fakeAlwaysRateLimiter) Accept() {}

func (t *fakeAlwaysRateLimiter) QPS() float32 {
return 1
}

type fakeNeverRateLimiter struct {
wg sync.WaitGroup
}
Expand Down Expand Up @@ -114,3 +128,7 @@ func (t *fakeNeverRateLimiter) Stop() {
func (t *fakeNeverRateLimiter) Accept() {
t.wg.Wait()
}

func (t *fakeNeverRateLimiter) QPS() float32 {
return 1
}

0 comments on commit f6b1c31

Please sign in to comment.