Skip to content

Commit

Permalink
fix(app): Introduced flag to specify leader election behavious
Browse files Browse the repository at this point in the history
Signed-off-by: chang.qiangqiang <chang.qiangqiang@immomo.com>
  • Loading branch information
CharlesQQ committed Apr 21, 2023
1 parent d4458ff commit 79998c3
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 19 deletions.
22 changes: 22 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ const (
defaultDetectionPeriodOfDependsOntask = 100 * time.Millisecond
)

var (
defaultElectionLeaseDuration = 15 * time.Second
defaultElectionRenewDeadline = 10 * time.Second
defaultElectionRetryPeriod = 5 * time.Second
)

// ServerOption is the main context object for the controllers.
type ServerOption struct {
KubeClientOptions kube.ClientOptions
Expand All @@ -45,6 +51,9 @@ type ServerOption struct {
KeyData []byte
EnableLeaderElection bool
LockObjectNamespace string
LeaseDuration time.Duration
RenewDeadline time.Duration
RetryPeriod time.Duration
PrintVersion bool
// WorkerThreads is the number of threads syncing job operations
// concurrently. Larger number = faster job updating, but more CPU load.
Expand Down Expand Up @@ -84,6 +93,19 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.EnableLeaderElection, "leader-elect", s.EnableLeaderElection, "Start a leader election client and gain leadership before "+
"executing the main loop. Enable this when running replicated vc-controller-manager for high availability.")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object.")
fs.DurationVar(&s.LeaseDuration, "leader-elect-lease-duration", defaultElectionLeaseDuration, ""+
"The duration that non-leader candidates will wait after observing a leadership "+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
"slot. This is effectively the maximum duration that a leader can be stopped "+
"before it is replaced by another candidate. This is only applicable if leader "+
"election is enabled.")
fs.DurationVar(&s.RenewDeadline, "leader-elect-renew-deadline", defaultElectionRenewDeadline, ""+
"The interval between attempts by the acting master to renew a leadership slot "+
"before it stops leading. This must be less than or equal to the lease duration. "+
"This is only applicable if leader election is enabled.")
fs.DurationVar(&s.RetryPeriod, "leader-elect-retry-period", defaultElectionRetryPeriod, ""+
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
fs.Float32Var(&s.KubeClientOptions.QPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeClientOptions.Burst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
Expand Down
3 changes: 3 additions & 0 deletions cmd/controller-manager/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func TestAddFlags(t *testing.T) {
QPS: defaultQPS,
Burst: 200,
},
LeaseDuration: defaultElectionLeaseDuration,
RenewDeadline: defaultElectionRenewDeadline,
RetryPeriod: defaultElectionRetryPeriod,
PrintVersion: false,
WorkerThreads: defaultWorkers,
SchedulerNames: []string{"volcano", "volcano2"},
Expand Down
12 changes: 3 additions & 9 deletions cmd/controller-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"os"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
Expand All @@ -42,11 +41,6 @@ import (
"volcano.sh/volcano/pkg/kube"
)

const (
leaseDuration = 15 * time.Second
renewDeadline = 10 * time.Second
retryPeriod = 5 * time.Second
)

// Run the controller.
func Run(opt *options.ServerOption) error {
Expand Down Expand Up @@ -102,9 +96,9 @@ func Run(opt *options.ServerOption) error {

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
LeaseDuration: opt.LeaseDuration,
RenewDeadline: opt.RenewDeadline,
RetryPeriod: opt.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
Expand Down
22 changes: 22 additions & 0 deletions cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ const (
defaultPercentageOfNodesToFind = 100
)

var (
defaultElectionLeaseDuration = 15 * time.Second
defaultElectionRenewDeadline = 10 * time.Second
defaultElectionRetryPeriod = 5 * time.Second
)

// ServerOption is the main context object for the controller manager.
type ServerOption struct {
KubeClientOptions kube.ClientOptions
Expand All @@ -55,6 +61,9 @@ type ServerOption struct {
SchedulePeriod time.Duration
EnableLeaderElection bool
LockObjectNamespace string
LeaseDuration time.Duration
RenewDeadline time.Duration
RetryPeriod time.Duration
DefaultQueue string
PrintVersion bool
EnableMetrics bool
Expand Down Expand Up @@ -104,6 +113,19 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
"executing the main loop. Enable this when running replicated vc-scheduler for high availability")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object that is used for leader election")
fs.DurationVar(&s.LeaseDuration, "leader-elect-lease-duration", defaultElectionLeaseDuration, ""+
"The duration that non-leader candidates will wait after observing a leadership "+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
"slot. This is effectively the maximum duration that a leader can be stopped "+
"before it is replaced by another candidate. This is only applicable if leader "+
"election is enabled.")
fs.DurationVar(&s.RenewDeadline, "leader-elect-renew-deadline", defaultElectionRenewDeadline, ""+
"The interval between attempts by the acting master to renew a leadership slot "+
"before it stops leading. This must be less than or equal to the lease duration. "+
"This is only applicable if leader election is enabled.")
fs.DurationVar(&s.RetryPeriod, "leader-elect-retry-period", defaultElectionRetryPeriod, ""+
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.")
fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.")
fs.BoolVar(&s.EnablePriorityClass, "priority-class", true,
Expand Down
4 changes: 4 additions & 0 deletions cmd/scheduler/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestAddFlags(t *testing.T) {
"--schedule-period=5m",
"--priority-class=false",
"--cache-dumper=false",
"--leader-elect-retry-period=2s",
}
fs.Parse(args)

Expand All @@ -50,6 +51,9 @@ func TestAddFlags(t *testing.T) {
QPS: defaultQPS,
Burst: defaultBurst,
},
LeaseDuration: defaultElectionLeaseDuration,
RenewDeadline: defaultElectionRenewDeadline,
RetryPeriod: 2 * time.Second,
PluginsDir: defaultPluginsDir,
HealthzBindAddress: ":11251",
MinNodesToFind: defaultMinNodesToFind,
Expand Down
14 changes: 4 additions & 10 deletions cmd/scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"fmt"
"net/http"
"os"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"

"volcano.sh/apis/pkg/apis/helpers"

"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/kube"
"volcano.sh/volcano/pkg/scheduler"
Expand All @@ -49,12 +49,6 @@ import (
"k8s.io/client-go/tools/record"
)

const (
leaseDuration = 15 * time.Second
renewDeadline = 10 * time.Second
retryPeriod = 5 * time.Second
)

// Run the volcano scheduler.
func Run(opt *options.ServerOption) error {
if opt.PrintVersion {
Expand Down Expand Up @@ -140,9 +134,9 @@ func Run(opt *options.ServerOption) error {

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
LeaseDuration: opt.LeaseDuration,
RenewDeadline: opt.RenewDeadline,
RetryPeriod: opt.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
Expand Down

0 comments on commit 79998c3

Please sign in to comment.