Skip to content

Commit

Permalink
add ratelimiter flags
Browse files Browse the repository at this point in the history
Signed-off-by: pigletfly <wangbing.adam@gmail.com>
  • Loading branch information
pigletfly committed Mar 3, 2022
1 parent 956604d commit 9e28d6c
Show file tree
Hide file tree
Showing 17 changed files with 187 additions and 22 deletions.
15 changes: 14 additions & 1 deletion cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
"github.com/karmada-io/karmada/pkg/version"
"github.com/karmada-io/karmada/pkg/version/sharedcommand"
)
Expand Down Expand Up @@ -194,6 +195,7 @@ func startClusterStatusController(ctx controllerscontext.Context) (enabled bool,
ClusterLeaseDuration: opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
RatelimiterOptions: ctx.Opts.RatelimiterOptions,
}
if err := clusterStatusController.SetupWithManager(mgr); err != nil {
return false, err
Expand Down Expand Up @@ -224,6 +226,7 @@ func startBindingController(ctx controllerscontext.Context) (enabled bool, err e
OverrideManager: ctx.OverrideManager,
InformerManager: ctx.ControlPlaneInformerManager,
ResourceInterpreter: ctx.ResourceInterpreter,
RatelimiterOptions: ctx.Opts.RatelimiterOptions,
}
if err := bindingController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand All @@ -237,6 +240,7 @@ func startBindingController(ctx controllerscontext.Context) (enabled bool, err e
OverrideManager: ctx.OverrideManager,
InformerManager: ctx.ControlPlaneInformerManager,
ResourceInterpreter: ctx.ResourceInterpreter,
RatelimiterOptions: ctx.Opts.RatelimiterOptions,
}
if err := clusterResourceBindingController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand All @@ -253,6 +257,7 @@ func startExecutionController(ctx controllerscontext.Context) (enabled bool, err
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
InformerManager: informermanager.GetInstance(),
ClusterClientSetFunc: util.NewClusterDynamicClientSet,
RatelimiterOption: ctx.Opts.RatelimiterOptions,
}
if err := executionController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand All @@ -273,6 +278,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er
ClusterClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
RatelimiterOptions: ctx.Opts.RatelimiterOptions,
}
workStatusController.RunWorkQueue()
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {
Expand Down Expand Up @@ -379,7 +385,12 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
}

objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)

ratelimiterOptions := ratelimiter.Options{
BaseDelay: opts.RateLimiterBaseDelay,
MaxDelay: opts.RateLimiterMaxDelay,
QPS: opts.RateLimiterQPS,
BucketSize: opts.RateLimiterBucketSize,
}
resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet,
Client: mgr.GetClient(),
Expand All @@ -392,6 +403,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
EventRecorder: mgr.GetEventRecorderFor("resource-detector"),
ConcurrentResourceTemplateSyncs: opts.ConcurrentResourceTemplateSyncs,
ConcurrentResourceBindingSyncs: opts.ConcurrentResourceBindingSyncs,
RatelimiterOptions: ratelimiterOptions,
}
if err := mgr.Add(resourceDetector); err != nil {
klog.Fatalf("Failed to setup resource detector: %v", err)
Expand Down Expand Up @@ -427,6 +439,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ClusterAPIBurst: opts.ClusterAPIBurst,
SkippedPropagatingNamespaces: opts.SkippedPropagatingNamespaces,
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
RatelimiterOptions: ratelimiterOptions,
},
StopChan: stopChan,
DynamicClientSet: dynamicClientSet,
Expand Down
12 changes: 12 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ type Options struct {
ConcurrentNamespaceSyncs int
// ConcurrentResourceTemplateSyncs is the number of resource templates that are allowed to sync concurrently.
ConcurrentResourceTemplateSyncs int
// RateLimiterBaseDelay is the base delay for ItemExponentialFailureRateLimiter.
RateLimiterBaseDelay time.Duration
// RateLimiterMaxDelay is the max delay for ItemExponentialFailureRateLimiter.
RateLimiterMaxDelay time.Duration
// RateLimiterQPS is the qps for BucketRateLimiter
RateLimiterQPS int
// RateLimiterBucketSize is the bucket size for BucketRateLimiter
RateLimiterBucketSize int
}

// NewOptions builds an empty options.
Expand Down Expand Up @@ -158,5 +166,9 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string) {
flags.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of Works that are allowed to sync concurrently.")
flags.IntVar(&o.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", 1, "The number of Namespaces that are allowed to sync concurrently.")
flags.IntVar(&o.ConcurrentResourceTemplateSyncs, "concurrent-resource-template-syncs", 5, "The number of resource templates that are allowed to sync concurrently.")
flags.DurationVar(&o.RateLimiterBaseDelay, "rate-limiter-base-delay", time.Millisecond*5, "The base delay for rate limiter.")
flags.DurationVar(&o.RateLimiterMaxDelay, "rate-limiter-max-delay", time.Second*1000, "The max delay for rate limiter.")
flags.IntVar(&o.RateLimiterQPS, "rate-limiter-qps", 10, "The qps for rate limier.")
flags.IntVar(&o.RateLimiterBucketSize, "rate-limiter-bucket-size", 100, "The bucket size for rate limier.")
features.FeatureGate.AddFlag(flags)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/vektra/mockery/v2 v2.9.4
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
golang.org/x/tools v0.1.6-0.20210820212750-d4cc65f0b2ff
gomodules.xyz/jsonpatch/v2 v2.2.0
google.golang.org/grpc v1.40.0
Expand Down Expand Up @@ -143,7 +144,6 @@ require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20211029165221-6e7872819dc8 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2 // indirect
Expand Down
7 changes: 6 additions & 1 deletion pkg/clusterdiscovery/clusterapi/clusterapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ func (d *ClusterDetector) Start(ctx context.Context) error {
d.stopCh = ctx.Done()

d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
d.Processor = util.NewAsyncWorker("cluster-api cluster detector", ClusterWideKeyFunc, d.Reconcile)
workerOptions := util.Options{
Name: "cluster-api cluster detector",
KeyFunc: ClusterWideKeyFunc,
ReconcileFunc: d.Reconcile,
}
d.Processor = util.NewAsyncWorker(workerOptions)
d.Processor.Run(d.ConcurrentReconciles, d.stopCh)
d.discoveryCluster()

Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/binding/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
)

// ControllerName is the controller name that will be used when reporting events.
Expand All @@ -42,6 +44,7 @@ type ResourceBindingController struct {
RESTMapper meta.RESTMapper
OverrideManager overridemanager.OverrideManager
ResourceInterpreter resourceinterpreter.ResourceInterpreter
RatelimiterOptions ratelimiter.Options
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
Expand Down Expand Up @@ -169,6 +172,9 @@ func (c *ResourceBindingController) SetupWithManager(mgr controllerruntime.Manag
Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workFn), workPredicateFn).
Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOptions),
}).
Complete(c)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
)

// ClusterResourceBindingControllerName is the controller name that will be used when reporting events.
Expand All @@ -42,6 +44,7 @@ type ClusterResourceBindingController struct {
RESTMapper meta.RESTMapper
OverrideManager overridemanager.OverrideManager
ResourceInterpreter resourceinterpreter.ResourceInterpreter
RatelimiterOptions ratelimiter.Options
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
Expand Down Expand Up @@ -158,6 +161,9 @@ func (c *ClusterResourceBindingController) SetupWithManager(mgr controllerruntim
Watches(&source.Kind{Type: &workv1alpha1.Work{}}, handler.EnqueueRequestsFromMapFunc(workFn), workPredicateFn).
Watches(&source.Kind{Type: &policyv1alpha1.OverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}}, handler.EnqueueRequestsFromMapFunc(c.newOverridePolicyFunc())).
WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOptions),
}).
Complete(c)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
)

// Options defines all the parameters required by our controllers.
Expand Down Expand Up @@ -49,6 +50,7 @@ type Options struct {
ClusterName string
// ConcurrentWorkSyncs is the number of Works that are allowed to sync concurrently.
ConcurrentWorkSyncs int
RatelimiterOptions ratelimiter.Options
}

// Context defines the context object for controller.
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/execution/execution_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

Expand All @@ -25,6 +26,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/informermanager/keys"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
)

const (
Expand All @@ -41,6 +43,7 @@ type Controller struct {
PredicateFunc predicate.Predicate
InformerManager informermanager.MultiClusterInformerManager
ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
RatelimiterOption ratelimiter.Options
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
Expand Down Expand Up @@ -100,6 +103,9 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
For(&workv1alpha1.Work{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
WithEventFilter(c.PredicateFunc).
WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOption),
}).
Complete(c)
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager

// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *ServiceExportController) RunWorkQueue() {
c.worker = util.NewAsyncWorker("service-export", nil, c.syncServiceExportOrEndpointSlice)
workerOptions := util.Options{
Name: "service-export",
KeyFunc: nil,
ReconcileFunc: c.syncServiceExportOrEndpointSlice,
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker.Run(c.WorkerNumber, c.StopChan)
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/controllers/status/cluster_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
)

const (
Expand Down Expand Up @@ -82,6 +84,7 @@ type ClusterStatusController struct {
ClusterLeaseControllers sync.Map

ClusterCacheSyncTimeout metav1.Duration
RatelimiterOptions ratelimiter.Options
}

// Reconcile syncs status of the given member cluster.
Expand Down Expand Up @@ -113,7 +116,9 @@ func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerr

// SetupWithManager creates a controller and register to controller manager.
func (c *ClusterStatusController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(c.PredicateFunc).Complete(c)
return controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOptions),
}).Complete(c)
}

func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Cluster) (controllerruntime.Result, error) {
Expand Down
14 changes: 12 additions & 2 deletions pkg/controllers/status/workstatus_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/informermanager/keys"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
"github.com/karmada-io/karmada/pkg/util/restmapper"
)

Expand All @@ -47,6 +49,7 @@ type WorkStatusController struct {
PredicateFunc predicate.Predicate
ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterCacheSyncTimeout metav1.Duration
RatelimiterOptions ratelimiter.Options
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
Expand Down Expand Up @@ -114,7 +117,12 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler {

// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *WorkStatusController) RunWorkQueue() {
c.worker = util.NewAsyncWorker("work-status", generateKey, c.syncWorkStatus)
workerOptions := util.Options{
Name: "work-status",
KeyFunc: generateKey,
ReconcileFunc: c.syncWorkStatus,
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker.Run(c.ConcurrentWorkStatusSyncs, c.StopChan)
}

Expand Down Expand Up @@ -443,5 +451,7 @@ func (c *WorkStatusController) getSingleClusterManager(cluster *clusterv1alpha1.

// SetupWithManager creates a controller and register to controller manager.
func (c *WorkStatusController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).Complete(c)
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).WithOptions(controller.Options{
RateLimiter: ratelimiter.DefaultControllerRateLimiter(c.RatelimiterOptions),
}).Complete(c)
}
15 changes: 12 additions & 3 deletions pkg/dependenciesdistributor/dependencies_distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,13 @@ func (d *DependenciesDistributor) Start(ctx context.Context) error {
klog.Infof("Starting dependencies distributor.")
d.stopCh = ctx.Done()

bindingWorkerOptions := util.Options{
Name: "resourceBinding reconciler",
KeyFunc: detector.ClusterWideKeyFunc,
ReconcileFunc: d.ReconcileResourceBinding,
}
// setup binding reconcile worker
d.bindingReconcileWorker = util.NewAsyncWorker("resourceBinding reconciler", detector.ClusterWideKeyFunc, d.ReconcileResourceBinding)
d.bindingReconcileWorker = util.NewAsyncWorker(bindingWorkerOptions)
d.bindingReconcileWorker.Run(2, d.stopCh)

// watch and enqueue ResourceBinding changes.
Expand All @@ -92,9 +97,13 @@ func (d *DependenciesDistributor) Start(ctx context.Context) error {
bindingHandler := informermanager.NewHandlerOnEvents(nil, d.OnResourceBindingUpdate, d.OnResourceBindingDelete)
d.InformerManager.ForResource(resourceBindingGVR, bindingHandler)
d.resourceBindingLister = d.InformerManager.Lister(resourceBindingGVR)

resourceWorkerOptions := util.Options{
Name: "resource detector",
KeyFunc: detector.ClusterWideKeyFunc,
ReconcileFunc: d.Reconcile,
}
d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
d.Processor = util.NewAsyncWorker("resource detector", detector.ClusterWideKeyFunc, d.Reconcile)
d.Processor = util.NewAsyncWorker(resourceWorkerOptions)
d.Processor.Run(2, d.stopCh)
go d.discoverResources(30 * time.Second)

Expand Down
Loading

0 comments on commit 9e28d6c

Please sign in to comment.