Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow switching rate limiter inside RateLimitedQueue #28882

Merged
merged 1 commit into from
Jul 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,9 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
}
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration,
s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR,
int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

Expand Down
3 changes: 2 additions & 1 deletion cmd/kube-controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.DeploymentControllerSyncPeriod.Duration, "deployment-controller-sync-period", s.DeploymentControllerSyncPeriod.Duration, "Period for syncing the deployments.")
fs.DurationVar(&s.PodEvictionTimeout.Duration, "pod-eviction-timeout", s.PodEvictionTimeout.Duration, "The grace period for deleting pods on failed nodes.")
fs.Float32Var(&s.DeletingPodsQps, "deleting-pods-qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.")
fs.Int32Var(&s.DeletingPodsBurst, "deleting-pods-burst", 1, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.")
fs.Int32Var(&s.DeletingPodsBurst, "deleting-pods-burst", 0, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.")
fs.MarkDeprecated("deleting-pods-burst", "This flag is currently no-op and will be deleted.")
fs.Int32Var(&s.RegisterRetryCount, "register-retry-count", s.RegisterRetryCount, ""+
"The number of retries for initial node registration. Retry interval equals node-sync-period.")
fs.MarkDeprecated("register-retry-count", "This flag is currently no-op and will be deleted.")
Expand Down
2 changes: 1 addition & 1 deletion contrib/mesos/pkg/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *CMServer) Run(_ []string) error {
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
_, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps,
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration)

Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/componentconfig/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ type KubeControllerManagerConfiguration struct {
// deletingPodsQps is the number of nodes per second on which pods are deleted in
// case of node failure.
DeletingPodsQps float32 `json:"deletingPodsQps"`
// deletingPodsBurst is the number of nodes on which pods are bursty deleted in
// DEPRECATED: deletingPodsBurst is the number of nodes on which pods are bursty deleted in
// case of node failure. For more details look into RateLimiter.
DeletingPodsBurst int32 `json:"deletingPodsBurst"`
// nodeMontiorGracePeriod is the amount of time which we allow a running node to be
Expand Down
9 changes: 4 additions & 5 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const (
nodeStatusUpdateRetry = 5
// controls how often NodeController will try to evict Pods from non-responsive Nodes.
nodeEvictionPeriod = 100 * time.Millisecond
// Burst value for all eviction rate limiters
evictionRateLimiterBurst = 1
)

type zoneState string
Expand Down Expand Up @@ -114,7 +116,6 @@ type NodeController struct {
zonePodEvictor map[string]*RateLimitedTimedQueue
zoneTerminationEvictor map[string]*RateLimitedTimedQueue
evictionLimiterQPS float32
evictionLimiterBurst int
podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
Expand Down Expand Up @@ -144,7 +145,6 @@ func NewNodeController(
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
evictionLimiterBurst int,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
Expand Down Expand Up @@ -200,7 +200,6 @@ func NewNodeController(
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
computeZoneStateFunc: ComputeZoneState,
evictionLimiterQPS: evictionLimiterQPS,
evictionLimiterBurst: evictionLimiterBurst,
zoneStates: make(map[string]zoneState),
}

Expand Down Expand Up @@ -385,11 +384,11 @@ func (nc *NodeController) monitorNodeStatus() error {
if _, found := nc.zonePodEvictor[zone]; !found {
nc.zonePodEvictor[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst))
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
}
if _, found := nc.zoneTerminationEvictor[zone]; !found {
nc.zoneTerminationEvictor[zone] = NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst))
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
}
nc.cancelPodEviction(added[i])
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/node/nodecontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {

for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, testRateLimiterQPS, testRateLimiterBurst, testNodeMonitorGracePeriod,
evictionTimeout, testRateLimiterQPS, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
for _, ds := range item.daemonSets {
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
deleteWaitChan: make(chan struct{}),
}
nodeController := NewNodeController(nil, fnh, 10*time.Minute,
testRateLimiterQPS, testRateLimiterBurst,
testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.cloud = &fakecloud.FakeCloud{}
Expand Down Expand Up @@ -907,7 +907,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}

for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst,
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
Expand Down Expand Up @@ -1057,7 +1057,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
}

for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst,
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
Expand Down Expand Up @@ -1139,7 +1139,7 @@ func TestNodeDeletion(t *testing.T) {
Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
}

nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, testRateLimiterBurst,
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func TestCheckPod(t *testing.T) {
},
}

nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
nc.nodeStore.Store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{
Expand Down Expand Up @@ -1310,7 +1310,7 @@ func TestCleanupOrphanedPods(t *testing.T) {
newPod("b", "bar"),
newPod("c", "gone"),
}
nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc := NewNodeController(nil, nil, 0, 0, 0, 0, 0, nil, nil, 0, false)

nc.nodeStore.Store.Add(newNode("foo"))
nc.nodeStore.Store.Add(newNode("bar"))
Expand Down
39 changes: 37 additions & 2 deletions pkg/controller/node/rate_limited_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ func (q *UniqueQueue) Clear() {
// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time
// of execution. It is also rate limited.
type RateLimitedTimedQueue struct {
queue UniqueQueue
limiter flowcontrol.RateLimiter
queue UniqueQueue
limiterLock sync.Mutex
limiter flowcontrol.RateLimiter
}

// Creates new queue which will use given RateLimiter to oversee execution.
Expand All @@ -173,6 +174,8 @@ type ActionFunc func(TimedValue) (bool, time.Duration)
// time to execute the next item in the queue.
func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
val, ok := q.queue.Head()
q.limiterLock.Lock()
defer q.limiterLock.Unlock()
for ok {
// rate limit the queue checking
if !q.limiter.TryAccept() {
Expand Down Expand Up @@ -216,3 +219,35 @@ func (q *RateLimitedTimedQueue) Remove(value string) bool {
func (q *RateLimitedTimedQueue) Clear() {
q.queue.Clear()
}

// SwapLimiter safely swaps current limiter for this queue with the passed one if capacities or qps's differ.
func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) {
q.limiterLock.Lock()
defer q.limiterLock.Unlock()
if q.limiter.QPS() == newQPS {
return
}
var newLimiter flowcontrol.RateLimiter
if newQPS <= 0 {
newLimiter = flowcontrol.NewFakeNeverRateLimiter()
} else {
newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, evictionRateLimiterBurst)
}
// If we're currently waiting on limiter, we drain the new one - this is a good approach when Burst value is 1
// TODO: figure out if we need to support higher Burst values and decide on the drain logic, should we keep:
// - saturation (percentage of used tokens)
// - number of used tokens
// - number of available tokens
// - something else
for q.limiter.Saturation() > newLimiter.Saturation() {
// Check if we're not using fake limiter
previousSaturation := newLimiter.Saturation()
newLimiter.TryAccept()
// It's a fake limiter
if newLimiter.Saturation() == previousSaturation {
break
}
}
q.limiter.Stop()
q.limiter = newLimiter
}
24 changes: 24 additions & 0 deletions pkg/controller/node/rate_limited_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,27 @@ func TestClear(t *testing.T) {
t.Fatalf("Clear should remove all elements from the queue.")
}
}

func TestSwapLimiter(t *testing.T) {
evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter())
fakeAlways := flowcontrol.NewFakeAlwaysRateLimiter()
qps := evictor.limiter.QPS()
if qps != fakeAlways.QPS() {
t.Fatalf("QPS does not match create one: %v instead of %v", qps, fakeAlways.QPS())
}

evictor.SwapLimiter(0)
qps = evictor.limiter.QPS()
fakeNever := flowcontrol.NewFakeNeverRateLimiter()
if qps != fakeNever.QPS() {
t.Fatalf("QPS does not match create one: %v instead of %v", qps, fakeNever.QPS())
}

createdQPS := float32(5.5)
evictor.SwapLimiter(createdQPS)
qps = evictor.limiter.QPS()
if qps != createdQPS {
t.Fatalf("QPS does not match create one: %v instead of %v", qps, createdQPS)
}

}
20 changes: 19 additions & 1 deletion pkg/util/flowcontrol/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ type RateLimiter interface {
// Usually we use token bucket rate limiter. In that case,
// 1.0 means no tokens are available; 0.0 means we have a full bucket of tokens to use.
Saturation() float64
// QPS returns QPS of this rate limiter
QPS() float32
}

type tokenBucketRateLimiter struct {
limiter *ratelimit.Bucket
qps float32
}

// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
Expand All @@ -48,7 +51,10 @@ type tokenBucketRateLimiter struct {
// The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
limiter := ratelimit.NewBucketWithRate(float64(qps), int64(burst))
return &tokenBucketRateLimiter{limiter}
return &tokenBucketRateLimiter{
limiter: limiter,
qps: qps,
}
}

func (t *tokenBucketRateLimiter) TryAccept() bool {
Expand All @@ -69,6 +75,10 @@ func (t *tokenBucketRateLimiter) Accept() {
func (t *tokenBucketRateLimiter) Stop() {
}

func (t *tokenBucketRateLimiter) QPS() float32 {
return t.qps
}

type fakeAlwaysRateLimiter struct{}

func NewFakeAlwaysRateLimiter() RateLimiter {
Expand All @@ -87,6 +97,10 @@ func (t *fakeAlwaysRateLimiter) Stop() {}

func (t *fakeAlwaysRateLimiter) Accept() {}

func (t *fakeAlwaysRateLimiter) QPS() float32 {
return 1
}

type fakeNeverRateLimiter struct {
wg sync.WaitGroup
}
Expand Down Expand Up @@ -114,3 +128,7 @@ func (t *fakeNeverRateLimiter) Stop() {
func (t *fakeNeverRateLimiter) Accept() {
t.wg.Wait()
}

func (t *fakeNeverRateLimiter) QPS() float32 {
return 1
}