From 79998c313ed816244fd2ae0d4b9d45334451f36b Mon Sep 17 00:00:00 2001 From: "chang.qiangqiang" Date: Fri, 21 Apr 2023 17:47:38 +0800 Subject: [PATCH] fix(app): Introduced flag to specify leader election behavious Signed-off-by: chang.qiangqiang --- cmd/controller-manager/app/options/options.go | 22 +++++++++++++++++++ .../app/options/options_test.go | 3 +++ cmd/controller-manager/app/server.go | 12 +++------- cmd/scheduler/app/options/options.go | 22 +++++++++++++++++++ cmd/scheduler/app/options/options_test.go | 4 ++++ cmd/scheduler/app/server.go | 14 ++++-------- 6 files changed, 58 insertions(+), 19 deletions(-) diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index ab0fd928471..b1d91c8870a 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -36,6 +36,12 @@ const ( defaultDetectionPeriodOfDependsOntask = 100 * time.Millisecond ) +var ( + defaultElectionLeaseDuration = 15 * time.Second + defaultElectionRenewDeadline = 10 * time.Second + defaultElectionRetryPeriod = 5 * time.Second +) + // ServerOption is the main context object for the controllers. type ServerOption struct { KubeClientOptions kube.ClientOptions @@ -45,6 +51,9 @@ type ServerOption struct { KeyData []byte EnableLeaderElection bool LockObjectNamespace string + LeaseDuration time.Duration + RenewDeadline time.Duration + RetryPeriod time.Duration PrintVersion bool // WorkerThreads is the number of threads syncing job operations // concurrently. Larger number = faster job updating, but more CPU load. @@ -84,6 +93,19 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.EnableLeaderElection, "leader-elect", s.EnableLeaderElection, "Start a leader election client and gain leadership before "+ "executing the main loop. Enable this when running replicated vc-controller-manager for high availability.") fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object.") + fs.DurationVar(&s.LeaseDuration, "leader-elect-lease-duration", defaultElectionLeaseDuration, ""+ + "The duration that non-leader candidates will wait after observing a leadership "+ + "renewal until attempting to acquire leadership of a led but unrenewed leader "+ + "slot. This is effectively the maximum duration that a leader can be stopped "+ + "before it is replaced by another candidate. This is only applicable if leader "+ + "election is enabled.") + fs.DurationVar(&s.RenewDeadline, "leader-elect-renew-deadline", defaultElectionRenewDeadline, ""+ + "The interval between attempts by the acting master to renew a leadership slot "+ + "before it stops leading. This must be less than or equal to the lease duration. "+ + "This is only applicable if leader election is enabled.") + fs.DurationVar(&s.RetryPeriod, "leader-elect-retry-period", defaultElectionRetryPeriod, ""+ + "The duration the clients should wait between attempting acquisition and renewal "+ + "of a leadership. This is only applicable if leader election is enabled.") fs.Float32Var(&s.KubeClientOptions.QPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeClientOptions.Burst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver") fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") diff --git a/cmd/controller-manager/app/options/options_test.go b/cmd/controller-manager/app/options/options_test.go index 120718bfeb0..795ee1d49ff 100644 --- a/cmd/controller-manager/app/options/options_test.go +++ b/cmd/controller-manager/app/options/options_test.go @@ -46,6 +46,9 @@ func TestAddFlags(t *testing.T) { QPS: defaultQPS, Burst: 200, }, + LeaseDuration: defaultElectionLeaseDuration, + RenewDeadline: defaultElectionRenewDeadline, + RetryPeriod: defaultElectionRetryPeriod, PrintVersion: false, WorkerThreads: defaultWorkers, SchedulerNames: []string{"volcano", "volcano2"}, diff --git a/cmd/controller-manager/app/server.go b/cmd/controller-manager/app/server.go index eeca173fce2..1999979fc82 100644 --- a/cmd/controller-manager/app/server.go +++ b/cmd/controller-manager/app/server.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "os" - "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/uuid" @@ -42,11 +41,6 @@ import ( "volcano.sh/volcano/pkg/kube" ) -const ( - leaseDuration = 15 * time.Second - renewDeadline = 10 * time.Second - retryPeriod = 5 * time.Second -) // Run the controller. func Run(opt *options.ServerOption) error { @@ -102,9 +96,9 @@ func Run(opt *options.ServerOption) error { leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ Lock: rl, - LeaseDuration: leaseDuration, - RenewDeadline: renewDeadline, - RetryPeriod: retryPeriod, + LeaseDuration: opt.LeaseDuration, + RenewDeadline: opt.RenewDeadline, + RetryPeriod: opt.RetryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { diff --git a/cmd/scheduler/app/options/options.go b/cmd/scheduler/app/options/options.go index 5f134aba212..2d3d8bc3811 100644 --- a/cmd/scheduler/app/options/options.go +++ b/cmd/scheduler/app/options/options.go @@ -43,6 +43,12 @@ const ( defaultPercentageOfNodesToFind = 100 ) +var ( + defaultElectionLeaseDuration = 15 * time.Second + defaultElectionRenewDeadline = 10 * time.Second + defaultElectionRetryPeriod = 5 * time.Second +) + // ServerOption is the main context object for the controller manager. type ServerOption struct { KubeClientOptions kube.ClientOptions @@ -55,6 +61,9 @@ type ServerOption struct { SchedulePeriod time.Duration EnableLeaderElection bool LockObjectNamespace string + LeaseDuration time.Duration + RenewDeadline time.Duration + RetryPeriod time.Duration DefaultQueue string PrintVersion bool EnableMetrics bool @@ -104,6 +113,19 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { "executing the main loop. Enable this when running replicated vc-scheduler for high availability") fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object that is used for leader election") + fs.DurationVar(&s.LeaseDuration, "leader-elect-lease-duration", defaultElectionLeaseDuration, ""+ + "The duration that non-leader candidates will wait after observing a leadership "+ + "renewal until attempting to acquire leadership of a led but unrenewed leader "+ + "slot. This is effectively the maximum duration that a leader can be stopped "+ + "before it is replaced by another candidate. This is only applicable if leader "+ + "election is enabled.") + fs.DurationVar(&s.RenewDeadline, "leader-elect-renew-deadline", defaultElectionRenewDeadline, ""+ + "The interval between attempts by the acting master to renew a leadership slot "+ + "before it stops leading. This must be less than or equal to the lease duration. "+ + "This is only applicable if leader election is enabled.") + fs.DurationVar(&s.RetryPeriod, "leader-elect-retry-period", defaultElectionRetryPeriod, ""+ + "The duration the clients should wait between attempting acquisition and renewal "+ + "of a leadership. This is only applicable if leader election is enabled.") fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.") fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.") fs.BoolVar(&s.EnablePriorityClass, "priority-class", true, diff --git a/cmd/scheduler/app/options/options_test.go b/cmd/scheduler/app/options/options_test.go index 9d7e76b527c..7835baf9586 100644 --- a/cmd/scheduler/app/options/options_test.go +++ b/cmd/scheduler/app/options/options_test.go @@ -35,6 +35,7 @@ func TestAddFlags(t *testing.T) { "--schedule-period=5m", "--priority-class=false", "--cache-dumper=false", + "--leader-elect-retry-period=2s", } fs.Parse(args) @@ -50,6 +51,9 @@ func TestAddFlags(t *testing.T) { QPS: defaultQPS, Burst: defaultBurst, }, + LeaseDuration: defaultElectionLeaseDuration, + RenewDeadline: defaultElectionRenewDeadline, + RetryPeriod: 2 * time.Second, PluginsDir: defaultPluginsDir, HealthzBindAddress: ":11251", MinNodesToFind: defaultMinNodesToFind, diff --git a/cmd/scheduler/app/server.go b/cmd/scheduler/app/server.go index 721202077f6..eb76293e685 100644 --- a/cmd/scheduler/app/server.go +++ b/cmd/scheduler/app/server.go @@ -21,11 +21,11 @@ import ( "fmt" "net/http" "os" - "time" "github.com/prometheus/client_golang/prometheus/promhttp" "volcano.sh/apis/pkg/apis/helpers" + "volcano.sh/volcano/cmd/scheduler/app/options" "volcano.sh/volcano/pkg/kube" "volcano.sh/volcano/pkg/scheduler" @@ -49,12 +49,6 @@ import ( "k8s.io/client-go/tools/record" ) -const ( - leaseDuration = 15 * time.Second - renewDeadline = 10 * time.Second - retryPeriod = 5 * time.Second -) - // Run the volcano scheduler. func Run(opt *options.ServerOption) error { if opt.PrintVersion { @@ -140,9 +134,9 @@ func Run(opt *options.ServerOption) error { leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: rl, - LeaseDuration: leaseDuration, - RenewDeadline: renewDeadline, - RetryPeriod: retryPeriod, + LeaseDuration: opt.LeaseDuration, + RenewDeadline: opt.RenewDeadline, + RetryPeriod: opt.RetryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() {