Skip to content

Commit

Permalink
fix(app): Introduced flag to specify leader election behavious
Browse files Browse the repository at this point in the history
Signed-off-by: chang.qiangqiang <chang.qiangqiang@immomo.com>
  • Loading branch information
CharlesQQ committed Mar 26, 2024
1 parent 339314e commit 3c66531
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 81 deletions.
29 changes: 13 additions & 16 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package options

import (
"fmt"
"os"

"github.com/spf13/pflag"
"k8s.io/component-base/config"
"os"

"volcano.sh/volcano/pkg/kube"
)
Expand All @@ -38,16 +38,16 @@ const (

// ServerOption is the main context object for the controllers.
type ServerOption struct {
KubeClientOptions kube.ClientOptions
CertFile string
KeyFile string
CaCertFile string
CertData []byte
KeyData []byte
CaCertData []byte
EnableLeaderElection bool
LockObjectNamespace string
PrintVersion bool
KubeClientOptions kube.ClientOptions
CertFile string
KeyFile string
CaCertFile string
CertData []byte
KeyData []byte
CaCertData []byte
// leaderElection defines the configuration of leader election.
LeaderElection config.LeaderElectionConfiguration
PrintVersion bool
// WorkerThreads is the number of threads syncing job operations
// concurrently. Larger number = faster job updating, but more CPU load.
WorkerThreads uint32
Expand Down Expand Up @@ -84,9 +84,6 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
"File containing the default x509 Certificate for HTTPS. (CA cert, if any, concatenated "+
"after server cert).")
fs.StringVar(&s.KeyFile, "tls-private-key-file", s.KeyFile, "File containing the default x509 private key matching --tls-cert-file.")
fs.BoolVar(&s.EnableLeaderElection, "leader-elect", true, "Start a leader election client and gain leadership before "+
"executing the main loop. Enable this when running replicated vc-controller-manager for high availability; it is enabled by default")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", defaultLockObjectNamespace, "Define the namespace of the lock object; it is volcano-system by default")
fs.Float32Var(&s.KubeClientOptions.QPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeClientOptions.Burst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
Expand All @@ -102,7 +99,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {

// CheckOptionOrDie checks the LockObjectNamespace.
func (s *ServerOption) CheckOptionOrDie() error {
if s.EnableLeaderElection && s.LockObjectNamespace == "" {
if s.LeaderElection.LeaderElect && s.LeaderElection.ResourceLock == "" {
return fmt.Errorf("lock-object-namespace must not be nil when LeaderElection is enabled")
}
return nil
Expand Down
23 changes: 20 additions & 3 deletions cmd/controller-manager/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ package options
import (
"reflect"
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/component-base/config"

commonutil "volcano.sh/volcano/pkg/util"

"github.com/spf13/pflag"

Expand All @@ -28,13 +35,17 @@ import (
func TestAddFlags(t *testing.T) {
fs := pflag.NewFlagSet("addflagstest", pflag.ContinueOnError)
s := NewServerOption()
commonutil.BindLeaderElectionFlags(&s.LeaderElection, fs)
s.AddFlags(fs)

args := []string{
"--master=127.0.0.1",
"--kube-api-burst=200",
"--scheduler-name=volcano",
"--scheduler-name=volcano2",
"--leader-elect-lease-duration=60s",
"--leader-elect-renew-deadline=20s",
"--leader-elect-retry-period=10s",
}
fs.Parse(args)

Expand All @@ -52,9 +63,15 @@ func TestAddFlags(t *testing.T) {
MaxRequeueNum: defaultMaxRequeueNum,
HealthzBindAddress: ":11251",
InheritOwnerAnnotations: true,
EnableLeaderElection: true,
LockObjectNamespace: defaultLockObjectNamespace,
WorkerThreadsForPG: 5,
LeaderElection: config.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{60 * time.Second},
RenewDeadline: metav1.Duration{20 * time.Second},
RetryPeriod: metav1.Duration{10 * time.Second},
ResourceLock: resourcelock.LeasesResourceLock,
ResourceNamespace: defaultLockObjectNamespace,
},
WorkerThreadsForPG: 5,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
14 changes: 4 additions & 10 deletions cmd/controller-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"os"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
Expand All @@ -36,18 +35,13 @@ import (

"volcano.sh/apis/pkg/apis/helpers"
vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"

"volcano.sh/volcano/cmd/controller-manager/app/options"
"volcano.sh/volcano/pkg/controllers/framework"
"volcano.sh/volcano/pkg/kube"
"volcano.sh/volcano/pkg/signals"
)

const (
leaseDuration = 15 * time.Second
renewDeadline = 10 * time.Second
retryPeriod = 5 * time.Second
)

// Run the controller.
func Run(opt *options.ServerOption) error {
config, err := kube.BuildConfig(opt.KubeClientOptions)
Expand Down Expand Up @@ -102,9 +96,9 @@ func Run(opt *options.ServerOption) error {

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
LeaseDuration: opt.LeaseDuration,
RenewDeadline: opt.RenewDeadline,
RetryPeriod: opt.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
Expand Down
7 changes: 6 additions & 1 deletion cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"runtime"
"time"

commonutil "volcano.sh/volcano/pkg/util"

"github.com/spf13/pflag"
_ "go.uber.org/automaxprocs"

Expand All @@ -46,8 +48,11 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
klog.InitFlags(nil)

fs := pflag.CommandLine
s := options.NewServerOption()
s.AddFlags(pflag.CommandLine)

s.AddFlags(fs)
commonutil.BindLeaderElectionFlags(&s.LeaderElection, fs)

cliflag.InitFlags()

Expand Down
57 changes: 27 additions & 30 deletions cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/spf13/pflag"
"k8s.io/component-base/config"

"volcano.sh/volcano/pkg/kube"
)
Expand All @@ -47,24 +48,24 @@ const (

// ServerOption is the main context object for the controller manager.
type ServerOption struct {
KubeClientOptions kube.ClientOptions
CertFile string
KeyFile string
CaCertFile string
CertData []byte
KeyData []byte
CaCertData []byte
SchedulerNames []string
SchedulerConf string
SchedulePeriod time.Duration
EnableLeaderElection bool
LockObjectNamespace string
DefaultQueue string
PrintVersion bool
EnableMetrics bool
ListenAddress string
EnablePriorityClass bool
EnableCSIStorage bool
KubeClientOptions kube.ClientOptions
CertFile string
KeyFile string
CaCertFile string
CertData []byte
KeyData []byte
CaCertData []byte
SchedulerNames []string
SchedulerConf string
SchedulePeriod time.Duration
// leaderElection defines the configuration of leader election.
LeaderElection config.LeaderElectionConfiguration
DefaultQueue string
PrintVersion bool
EnableMetrics bool
ListenAddress string
EnablePriorityClass bool
EnableCSIStorage bool
// vc-scheduler will load (not activate) custom plugins which are in this directory
PluginsDir string
EnableHealthz bool
Expand Down Expand Up @@ -112,11 +113,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.SchedulerConf, "scheduler-conf", "", "The absolute path of scheduler configuration file")
fs.DurationVar(&s.SchedulePeriod, "schedule-period", defaultSchedulerPeriod, "The period between each scheduling cycle")
fs.StringVar(&s.DefaultQueue, "default-queue", defaultQueue, "The default queue name of the job")
fs.BoolVar(&s.EnableLeaderElection, "leader-elect", true,
"Start a leader election client and gain leadership before "+
"executing the main loop. Enable this when running replicated vc-scheduler for high availability; it is enabled by default")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", defaultLockObjectNamespace, "Define the namespace of the lock object that is used for leader election; it is volcano-system by default")
fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.")
fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.")
fs.BoolVar(&s.EnablePriorityClass, "priority-class", true,
Expand Down Expand Up @@ -147,7 +144,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {

// CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled.
func (s *ServerOption) CheckOptionOrDie() error {
if s.EnableLeaderElection && s.LockObjectNamespace == "" {
if s.LeaderElection.LeaderElect && s.LeaderElection.ResourceLock == "" {
return fmt.Errorf("lock-object-namespace must not be nil when LeaderElection is enabled")
}

Expand Down Expand Up @@ -195,10 +192,10 @@ func (s *ServerOption) ParseCAFiles(decryptFunc DecryptFunc) error {
return nil
}

// Default new and registry a default one
func Default() *ServerOption {
s := NewServerOption()
s.AddFlags(pflag.CommandLine)
s.RegisterOptions()
return s
}
//// Default new and registry a default one
//func Default() *ServerOption {
// s := NewServerOption()
// s.AddFlags(pflag.CommandLine)
// s.RegisterOptions()
// return s
//}
24 changes: 20 additions & 4 deletions cmd/scheduler/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/component-base/config"

commonutil "volcano.sh/volcano/pkg/util"

"github.com/spf13/pflag"

"volcano.sh/volcano/pkg/kube"
Expand All @@ -29,21 +35,33 @@ import (
func TestAddFlags(t *testing.T) {
fs := pflag.NewFlagSet("addflagstest", pflag.ContinueOnError)
s := NewServerOption()
commonutil.BindLeaderElectionFlags(&s.LeaderElection, fs)
s.AddFlags(fs)

args := []string{
"--schedule-period=5m",
"--priority-class=false",
"--cache-dumper=false",
"--leader-elect-lease-duration=60s",
"--leader-elect-renew-deadline=20s",
"--leader-elect-retry-period=10s",
}
fs.Parse(args)

// This is a snapshot of expected options parsed by args.
expected := &ServerOption{
SchedulerNames: []string{defaultSchedulerName},
SchedulePeriod: 5 * time.Minute,
DefaultQueue: defaultQueue,
ListenAddress: defaultListenAddress,
LeaderElection: config.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{60 * time.Second},
RenewDeadline: metav1.Duration{20 * time.Second},
RetryPeriod: metav1.Duration{10 * time.Second},
ResourceLock: resourcelock.LeasesResourceLock,
ResourceNamespace: defaultLockObjectNamespace,
},
DefaultQueue: defaultQueue,
ListenAddress: defaultListenAddress,
KubeClientOptions: kube.ClientOptions{
Master: "",
KubeConfig: "",
Expand All @@ -55,8 +73,6 @@ func TestAddFlags(t *testing.T) {
MinNodesToFind: defaultMinNodesToFind,
MinPercentageOfNodesToFind: defaultMinPercentageOfNodesToFind,
PercentageOfNodesToFind: defaultPercentageOfNodesToFind,
EnableLeaderElection: true,
LockObjectNamespace: defaultLockObjectNamespace,
NodeWorkerThreads: defaultNodeWorkers,
CacheDumpFileDir: "/tmp",
}
Expand Down
20 changes: 6 additions & 14 deletions cmd/scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import (
"fmt"
"net/http"
"os"
"time"

"volcano.sh/apis/pkg/apis/helpers"

"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/kube"
"volcano.sh/volcano/pkg/scheduler"
Expand Down Expand Up @@ -55,12 +53,6 @@ import (
_ "k8s.io/component-base/metrics/prometheus/restclient"
)

const (
leaseDuration = 15 * time.Second
renewDeadline = 10 * time.Second
retryPeriod = 5 * time.Second
)

// Run the volcano scheduler.
func Run(opt *options.ServerOption) error {
if opt.PrintVersion {
Expand Down Expand Up @@ -104,7 +96,7 @@ func Run(opt *options.ServerOption) error {
<-ctx.Done()
}

if !opt.EnableLeaderElection {
if !opt.LeaderElection.LeaderElect {
run(ctx)
return fmt.Errorf("finished without leader elect")
}
Expand All @@ -116,7 +108,7 @@ func Run(opt *options.ServerOption) error {

// Prepare event clients.
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: leaderElectionClient.CoreV1().Events(opt.LockObjectNamespace)})
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: leaderElectionClient.CoreV1().Events(opt.LeaderElection.ResourceNamespace)})
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: commonutil.GenerateComponentName(opt.SchedulerNames)})

hostname, err := os.Hostname()
Expand All @@ -127,7 +119,7 @@ func Run(opt *options.ServerOption) error {
id := hostname + "_" + string(uuid.NewUUID())

rl, err := resourcelock.New(resourcelock.LeasesResourceLock,
opt.LockObjectNamespace,
opt.LeaderElection.ResourceNamespace,
commonutil.GenerateComponentName(opt.SchedulerNames),
leaderElectionClient.CoreV1(),
leaderElectionClient.CoordinationV1(),
Expand All @@ -141,9 +133,9 @@ func Run(opt *options.ServerOption) error {

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
LeaseDuration: opt.LeaderElection.LeaseDuration.Duration,
RenewDeadline: opt.LeaderElection.RenewDeadline.Duration,
RetryPeriod: opt.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
Expand Down
Loading

0 comments on commit 3c66531

Please sign in to comment.