Skip to content

Commit

Permalink
add ratelimiter flags
Browse files Browse the repository at this point in the history
Signed-off-by: pigletfly <wangbing.adam@gmail.com>
  • Loading branch information
pigletfly committed Feb 25, 2022
1 parent d980eea commit 0fbba97
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 9 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/vektra/mockery/v2 v2.9.4
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
golang.org/x/tools v0.1.6-0.20210820212750-d4cc65f0b2ff
gomodules.xyz/jsonpatch/v2 v2.2.0
google.golang.org/grpc v1.40.0
Expand Down Expand Up @@ -143,7 +144,6 @@ require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211029165221-6e7872819dc8 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2 // indirect
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/binding/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
)

// ControllerName is the controller name that will be used when reporting events.
Expand Down Expand Up @@ -169,6 +171,9 @@ func (c *ResourceBindingController) SetupWithManager(mgr controllerruntime.Manag
Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workFn), workPredicateFn).
Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(),
}).
Complete(c)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
)

// ClusterResourceBindingControllerName is the controller name that will be used when reporting events.
Expand Down Expand Up @@ -158,6 +160,9 @@ func (c *ClusterResourceBindingController) SetupWithManager(mgr controllerruntim
Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workFn), workPredicateFn).
Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(),
}).
Complete(c)
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/execution/execution_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

Expand All @@ -25,6 +26,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/informermanager/keys"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
)

const (
Expand Down Expand Up @@ -100,6 +102,9 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
For(&workv1alpha1.Work{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
WithEventFilter(c.PredicateFunc).
WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(),
}).
Complete(c)
}

Expand Down
15 changes: 9 additions & 6 deletions pkg/controllers/status/cluster_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import (
"sync"
"time"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -27,13 +32,9 @@ import (
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/informermanager"
)

const (
Expand Down Expand Up @@ -113,7 +114,9 @@ func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerr

// SetupWithManager creates a controller and register to controller manager.
func (c *ClusterStatusController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(c.PredicateFunc).Complete(c)
return controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(),
}).Complete(c)
}

func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Cluster) (controllerruntime.Result, error) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/controllers/status/workstatus_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/informermanager/keys"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
"github.com/karmada-io/karmada/pkg/util/restmapper"
)

Expand Down Expand Up @@ -443,5 +445,7 @@ func (c *WorkStatusController) getSingleClusterManager(cluster *clusterv1alpha1.

// SetupWithManager creates a controller and register to controller manager.
func (c *WorkStatusController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).Complete(c)
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(),
}).Complete(c)
}
27 changes: 27 additions & 0 deletions pkg/util/ratelimiter/rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ratelimiter

import (
"flag"
"time"

"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"
)

func init() {
flag.DurationVar(&baseDelay, "rate-limiter-base-delay", time.Millisecond*5, "The base delay for rate limiter. Defaults 5ms")
flag.DurationVar(&maxDelay, "rate-limiter-max-delay", time.Second*1000, "The max delay for rate limiter. Defaults 1000s")
flag.IntVar(&qps, "rate-limiter-qps", 10, "The qps for rate limier. Defaults 10")
flag.IntVar(&bucketSize, "rate-limiter-bucket-size", 100, "The bucket size for rate limier. Defaults 100")
}

var baseDelay, maxDelay time.Duration
var qps, bucketSize int

// DefaultControllerRateLimiter provide a default rate limiter for controller, and users can tune it by corresponding flags.
func DefaultControllerRateLimiter() workqueue.RateLimiter {
return workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(baseDelay, maxDelay),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)},
)
}
3 changes: 2 additions & 1 deletion pkg/util/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package util

import (
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -60,7 +61,7 @@ func NewAsyncWorker(name string, keyFunc KeyFunc, reconcileFunc ReconcileFunc) A
return &asyncWorker{
keyFunc: keyFunc,
reconcileFunc: reconcileFunc,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
queue: workqueue.NewNamedRateLimitingQueue(ratelimiter.DefaultControllerRateLimiter(), name),
}
}

Expand Down

0 comments on commit 0fbba97

Please sign in to comment.