Skip to content

Commit

Permalink
retrofit the scheduler with the leader election client.
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Danese <mikedanese@google.com>
  • Loading branch information
mikedanese committed Jan 13, 2016
1 parent a47c170 commit f71657d
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 5 deletions.
6 changes: 5 additions & 1 deletion docs/admin/kube-scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ kube-scheduler
--kube-api-burst=100: Burst to use while talking with kubernetes apiserver
--kube-api-qps=50: QPS to use while talking with kubernetes apiserver
--kubeconfig="": Path to kubeconfig file with authorization and master location information.
--leader-elect[=false]: Start a leader election client and gain leadership before executing scheduler loop. Enable this when running replicated schedulers.
--leader-elect-lease-duration=15s: The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled.
--leader-elect-renew-deadline=10s: The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled.
--leader-elect-retry-period=2s: The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled.
--log-flush-frequency=5s: Maximum number of seconds between log flushes
--master="": The address of the Kubernetes API server (overrides any value in kubeconfig)
--policy-config-file="": File with scheduler policy configuration
Expand All @@ -70,7 +74,7 @@ kube-scheduler
--scheduler-name="default-scheduler": Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'
```

###### Auto generated by spf13/cobra on 14-Dec-2015
###### Auto generated by spf13/cobra on 12-Jan-2016


<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
Expand Down
4 changes: 4 additions & 0 deletions hack/verify-flags/known-flags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,7 @@ www-prefix
clientset-name
clientset-only
clientset-path
leader-elect
leader-elect-lease-duration
leader-elect-renew-deadline
leader-elect-retry-period
58 changes: 56 additions & 2 deletions pkg/client/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,26 @@ import (
"reflect"
"time"

"github.com/golang/glog"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"

"github.com/golang/glog"
"github.com/spf13/pflag"
)

const (
JitterFactor = 1.2

LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader"

DefaultLeaseDuration = 15 * time.Second
DefaultRenewDeadline = 10 * time.Second
DefaultRetryPeriod = 2 * time.Second
)

// NewLeadereElector creates a LeaderElector from a LeaderElecitionConfig
Expand Down Expand Up @@ -173,6 +178,16 @@ func (le *LeaderElector) Run() {
close(stop)
}

// RunOrDie starts a client with the provided config or panics if the config
// fails to validate.
func RunOrDie(lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec)
if err != nil {
panic(err)
}
le.Run()
}

// GetLeader returns the identity of the last observed leader or returns the empty string if
// no leader has yet been observed.
func (le *LeaderElector) GetLeader() string {
Expand Down Expand Up @@ -315,3 +330,42 @@ func (l *LeaderElector) maybeReportTransition() {
go l.config.Callbacks.OnNewLeader(l.reportedLeader)
}
}

func DefaultLeaderElectionCLIConfig() LeaderElectionCLIConfig {
return LeaderElectionCLIConfig{
LeaderElect: false,
LeaseDuration: DefaultLeaseDuration,
RenewDeadline: DefaultRenewDeadline,
RetryPeriod: DefaultRetryPeriod,
}
}

// LeaderElectionCLIConfig is useful for embedding into component configuration objects
// to maintain consistent command line flags.
type LeaderElectionCLIConfig struct {
LeaderElect bool
LeaseDuration time.Duration
RenewDeadline time.Duration
RetryPeriod time.Duration
}

// BindFlags binds the common LeaderElectionCLIConfig flags to a flagset
func (l *LeaderElectionCLIConfig) BindFlags(fs *pflag.FlagSet) {
fs.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+
"Start a leader election client and gain leadership before "+
"executing scheduler loop. Enable this when running replicated "+
"schedulers.")
fs.DurationVar(&l.LeaseDuration, "leader-elect-lease-duration", l.LeaseDuration, ""+
"The duration that non-leader candidates will wait after observing a leadership"+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
"slot. This is effectively the maximum duration that a leader can be stopped "+
"before it is replaced by another candidate. This is only applicable if leader "+
"election is enabled.")
fs.DurationVar(&l.RenewDeadline, "leader-elect-renew-deadline", l.RenewDeadline, ""+
"The interval between attempts by the acting master to renew a leadership slot "+
"before it stops leading. This must be less than or equal to the lease duration. "+
"This is only applicable if leader election is enabled.")
fs.DurationVar(&l.RetryPeriod, "leader-elect-retry-period", l.RetryPeriod, ""+
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
}
4 changes: 4 additions & 0 deletions plugin/cmd/kube-scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"

Expand All @@ -41,6 +42,7 @@ type SchedulerServer struct {
KubeAPIQPS float32
KubeAPIBurst int
SchedulerName string
LeaderElection leaderelection.LeaderElectionCLIConfig
}

// NewSchedulerServer creates a new SchedulerServer with default parameters
Expand All @@ -54,6 +56,7 @@ func NewSchedulerServer() *SchedulerServer {
KubeAPIQPS: 50.0,
KubeAPIBurst: 100,
SchedulerName: api.DefaultSchedulerName,
LeaderElection: leaderelection.DefaultLeaderElectionCLIConfig(),
}
return &s
}
Expand All @@ -72,4 +75,5 @@ func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'")
s.LeaderElection.BindFlags(fs)
}
40 changes: 38 additions & 2 deletions plugin/cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strconv"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
Expand Down Expand Up @@ -110,9 +111,44 @@ func Run(s *options.SchedulerServer) error {
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))

sched := scheduler.New(config)
sched.Run()

select {}
run := func(_ <-chan struct{}) {
sched.Run()
select {}
}

if !s.LeaderElection.LeaderElect {
run(nil)
glog.Fatal("this statement is unreachable")
panic("unreachable")
}

id, err := os.Hostname()
if err != nil {
return err
}

leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
EndpointsMeta: api.ObjectMeta{
Namespace: "kube-system",
Name: "kube-scheduler",
},
Client: kubeClient,
Identity: id,
EventRecorder: config.Recorder,
LeaseDuration: s.LeaderElection.LeaseDuration,
RenewDeadline: s.LeaderElection.RenewDeadline,
RetryPeriod: s.LeaderElection.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("lost master")
},
},
})

glog.Fatal("this statement is unreachable")
panic("unreachable")
}

func createConfig(s *options.SchedulerServer, configFactory *factory.ConfigFactory) (*scheduler.Config, error) {
Expand Down

0 comments on commit f71657d

Please sign in to comment.