Skip to content

Commit

Permalink
Merge pull request #81263 from draveness/feature/update-scheduling-qu…
Browse files Browse the repository at this point in the history
…eue-with-options

feat: update scheduling queue with options
  • Loading branch information
k8s-ci-robot authored Oct 9, 2019
2 parents 8577711 + 9646afb commit b8b7c37
Show file tree
Hide file tree
Showing 16 changed files with 211 additions and 24 deletions.
16 changes: 12 additions & 4 deletions cmd/kube-scheduler/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ pluginConfig:

defaultSource := "DefaultProvider"
defaultBindTimeoutSeconds := int64(600)
defaultPodInitialBackoffSeconds := int64(1)
defaultPodMaxBackoffSeconds := int64(10)

testcases := []struct {
name string
Expand Down Expand Up @@ -275,8 +277,10 @@ pluginConfig:
Burst: 100,
ContentType: "application/vnd.kubernetes.protobuf",
},
BindTimeoutSeconds: &defaultBindTimeoutSeconds,
Plugins: nil,
BindTimeoutSeconds: &defaultBindTimeoutSeconds,
PodInitialBackoffSeconds: &defaultPodInitialBackoffSeconds,
PodMaxBackoffSeconds: &defaultPodMaxBackoffSeconds,
Plugins: nil,
},
},
{
Expand Down Expand Up @@ -355,7 +359,9 @@ pluginConfig:
Burst: 100,
ContentType: "application/vnd.kubernetes.protobuf",
},
BindTimeoutSeconds: &defaultBindTimeoutSeconds,
BindTimeoutSeconds: &defaultBindTimeoutSeconds,
PodInitialBackoffSeconds: &defaultPodInitialBackoffSeconds,
PodMaxBackoffSeconds: &defaultPodMaxBackoffSeconds,
},
},
{
Expand Down Expand Up @@ -416,7 +422,9 @@ pluginConfig:
Burst: 100,
ContentType: "application/vnd.kubernetes.protobuf",
},
BindTimeoutSeconds: &defaultBindTimeoutSeconds,
BindTimeoutSeconds: &defaultBindTimeoutSeconds,
PodInitialBackoffSeconds: &defaultPodInitialBackoffSeconds,
PodMaxBackoffSeconds: &defaultPodMaxBackoffSeconds,
Plugins: &kubeschedulerconfig.Plugins{
Reserve: &kubeschedulerconfig.PluginSet{
Enabled: []kubeschedulerconfig.Plugin{
Expand Down
2 changes: 2 additions & 0 deletions cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, regis
scheduler.WithFrameworkRegistry(registry),
scheduler.WithFrameworkPlugins(cc.ComponentConfig.Plugins),
scheduler.WithFrameworkPluginConfig(cc.ComponentConfig.PluginConfig),
scheduler.WithPodMaxBackoffSeconds(*cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(*cc.ComponentConfig.PodInitialBackoffSeconds),
)
if err != nil {
return err
Expand Down
10 changes: 10 additions & 0 deletions pkg/scheduler/apis/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ type KubeSchedulerConfiguration struct {
// If this value is nil, the default value will be used.
BindTimeoutSeconds *int64

// PodInitialBackoffSeconds is the initial backoff for unschedulable pods.
// If specified, it must be greater than 0. If this value is null, the default value (1s)
// will be used.
PodInitialBackoffSeconds *int64

// PodMaxBackoffSeconds is the max backoff for unschedulable pods.
// If specified, it must be greater than podInitialBackoffSeconds. If this value is null,
// the default value (10s) will be used.
PodMaxBackoffSeconds *int64

// Plugins specify the set of plugins that should be enabled or disabled. Enabled plugins are the
// ones that should be enabled in addition to the default plugins. Disabled plugins are any of the
// default plugins that should be disabled.
Expand Down
10 changes: 10 additions & 0 deletions pkg/scheduler/apis/config/v1alpha1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,14 @@ func SetDefaults_KubeSchedulerConfiguration(obj *kubeschedulerconfigv1alpha1.Kub
defaultBindTimeoutSeconds := int64(600)
obj.BindTimeoutSeconds = &defaultBindTimeoutSeconds
}

if obj.PodInitialBackoffSeconds == nil {
defaultPodInitialBackoffSeconds := int64(1)
obj.PodInitialBackoffSeconds = &defaultPodInitialBackoffSeconds
}

if obj.PodMaxBackoffSeconds == nil {
defaultPodMaxBackoffSeconds := int64(10)
obj.PodMaxBackoffSeconds = &defaultPodMaxBackoffSeconds
}
}
4 changes: 4 additions & 0 deletions pkg/scheduler/apis/config/v1alpha1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions pkg/scheduler/apis/config/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ func ValidateKubeSchedulerConfiguration(cc *config.KubeSchedulerConfiguration) f
allErrs = append(allErrs, field.Invalid(field.NewPath("percentageOfNodesToScore"),
cc.PercentageOfNodesToScore, "not in valid range 0-100"))
}
if cc.PodInitialBackoffSeconds == nil {
allErrs = append(allErrs, field.Required(field.NewPath("podInitialBackoffSeconds"), ""))
} else if *cc.PodInitialBackoffSeconds <= 0 {
allErrs = append(allErrs, field.Invalid(field.NewPath("podInitialBackoffSeconds"),
cc.PodInitialBackoffSeconds, "must be greater than 0"))
}
if cc.PodMaxBackoffSeconds == nil {
allErrs = append(allErrs, field.Required(field.NewPath("podMaxBackoffSeconds"), ""))
} else if cc.PodInitialBackoffSeconds != nil && *cc.PodMaxBackoffSeconds < *cc.PodInitialBackoffSeconds {
allErrs = append(allErrs, field.Invalid(field.NewPath("podMaxBackoffSeconds"),
cc.PodMaxBackoffSeconds, "must be greater than or equal to PodInitialBackoffSeconds"))
}
return allErrs
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/apis/config/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

func TestValidateKubeSchedulerConfiguration(t *testing.T) {
testTimeout := int64(0)
podInitialBackoffSeconds := int64(1)
podMaxBackoffSeconds := int64(1)
validConfig := &config.KubeSchedulerConfiguration{
SchedulerName: "me",
HealthzBindAddress: "0.0.0.0:10254",
Expand Down Expand Up @@ -57,6 +59,8 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) {
ResourceName: "name",
},
},
PodInitialBackoffSeconds: &podInitialBackoffSeconds,
PodMaxBackoffSeconds: &podMaxBackoffSeconds,
BindTimeoutSeconds: &testTimeout,
PercentageOfNodesToScore: 35,
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/scheduler/apis/config/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ type Configurator struct {

bindTimeoutSeconds int64

podInitialBackoffSeconds int64

podMaxBackoffSeconds int64

enableNonPreempting bool

// framework configuration arguments.
Expand Down Expand Up @@ -207,6 +211,8 @@ type ConfigFactoryArgs struct {
DisablePreemption bool
PercentageOfNodesToScore int32
BindTimeoutSeconds int64
PodInitialBackoffSeconds int64
PodMaxBackoffSeconds int64
StopCh <-chan struct{}
Registry framework.Registry
Plugins *config.Plugins
Expand Down Expand Up @@ -253,6 +259,8 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
disablePreemption: args.DisablePreemption,
percentageOfNodesToScore: args.PercentageOfNodesToScore,
bindTimeoutSeconds: args.BindTimeoutSeconds,
podInitialBackoffSeconds: args.PodInitialBackoffSeconds,
podMaxBackoffSeconds: args.PodMaxBackoffSeconds,
enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(features.NonPreemptingPriority),
registry: args.Registry,
plugins: args.Plugins,
Expand Down Expand Up @@ -413,7 +421,12 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
klog.Fatalf("error initializing the scheduling framework: %v", err)
}

podQueue := internalqueue.NewSchedulingQueue(c.StopEverything, framework)
podQueue := internalqueue.NewSchedulingQueue(
c.StopEverything,
framework,
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
)

// Setup cache debugger.
debugger := cachedebugger.New(
Expand Down
10 changes: 7 additions & 3 deletions pkg/scheduler/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ import (
)

const (
disablePodPreemption = false
bindTimeoutSeconds = 600
disablePodPreemption = false
bindTimeoutSeconds = 600
podInitialBackoffDurationSeconds = 1
podMaxBackoffDurationSeconds = 10
)

func TestCreate(t *testing.T) {
Expand Down Expand Up @@ -254,7 +256,7 @@ func TestDefaultErrorFunc(t *testing.T) {
defer close(stopCh)

timestamp := time.Now()
queue := internalqueue.NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp), nil)
queue := internalqueue.NewPriorityQueue(nil, nil, internalqueue.WithClock(clock.NewFakeClock(timestamp)))
schedulerCache := internalcache.New(30*time.Second, stopCh)
errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache, stopCh)

Expand Down Expand Up @@ -494,6 +496,8 @@ func newConfigFactoryWithFrameworkRegistry(
disablePodPreemption,
schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds,
podMaxBackoffDurationSeconds,
podInitialBackoffDurationSeconds,
stopCh,
registry,
nil,
Expand Down
77 changes: 64 additions & 13 deletions pkg/scheduler/internal/queue/scheduling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,24 @@ import (
"k8s.io/kubernetes/pkg/scheduler/util"
)

var (
const (
// If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval,
// the pod will be moved from unschedulableQ to activeQ.
unschedulableQTimeInterval = 60 * time.Second

queueClosed = "scheduling queue is closed"
)

// If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval,
// the pod will be moved from unschedulableQ to activeQ.
const unschedulableQTimeInterval = 60 * time.Second
const (
// DefaultPodInitialBackoffDuration is the default value for the initial backoff duration
// for unschedulable pods. To change the default podInitialBackoffDurationSeconds used by the
// scheduler, update the ComponentConfig value in defaults.go
DefaultPodInitialBackoffDuration time.Duration = 1 * time.Second
// DefaultPodMaxBackoffDuration is the default value for the max backoff duration
// for unschedulable pods. To change the default podMaxBackoffDurationSeconds used by the
// scheduler, update the ComponentConfig value in defaults.go
DefaultPodMaxBackoffDuration time.Duration = 10 * time.Second
)

// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
Expand Down Expand Up @@ -90,8 +101,8 @@ type SchedulingQueue interface {
}

// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
func NewSchedulingQueue(stop <-chan struct{}, fwk framework.Framework) SchedulingQueue {
return NewPriorityQueue(stop, fwk)
func NewSchedulingQueue(stop <-chan struct{}, fwk framework.Framework, opts ...Option) SchedulingQueue {
return NewPriorityQueue(stop, fwk, opts...)
}

// NominatedNodeName returns nominated node name of a Pod.
Expand Down Expand Up @@ -140,6 +151,42 @@ type PriorityQueue struct {
closed bool
}

type priorityQueueOptions struct {
clock util.Clock
podInitialBackoffDuration time.Duration
podMaxBackoffDuration time.Duration
}

// Option configures a PriorityQueue
type Option func(*priorityQueueOptions)

// WithClock sets clock for PriorityQueue, the default clock is util.RealClock.
func WithClock(clock util.Clock) Option {
return func(o *priorityQueueOptions) {
o.clock = clock
}
}

// WithPodInitialBackoffDuration sets pod initial backoff duration for PriorityQueue,
func WithPodInitialBackoffDuration(duration time.Duration) Option {
return func(o *priorityQueueOptions) {
o.podInitialBackoffDuration = duration
}
}

// WithPodMaxBackoffDuration sets pod max backoff duration for PriorityQueue,
func WithPodMaxBackoffDuration(duration time.Duration) Option {
return func(o *priorityQueueOptions) {
o.podMaxBackoffDuration = duration
}
}

var defaultPriorityQueueOptions = priorityQueueOptions{
clock: util.RealClock{},
podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
podMaxBackoffDuration: DefaultPodMaxBackoffDuration,
}

// Making sure that PriorityQueue implements SchedulingQueue.
var _ = SchedulingQueue(&PriorityQueue{})

Expand All @@ -162,12 +209,16 @@ func activeQComp(podInfo1, podInfo2 interface{}) bool {
}

// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(stop <-chan struct{}, fwk framework.Framework) *PriorityQueue {
return NewPriorityQueueWithClock(stop, util.RealClock{}, fwk)
}
func NewPriorityQueue(
stop <-chan struct{},
fwk framework.Framework,
opts ...Option,
) *PriorityQueue {
options := defaultPriorityQueueOptions
for _, opt := range opts {
opt(&options)
}

// NewPriorityQueueWithClock creates a PriorityQueue which uses the passed clock for time.
func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock, fwk framework.Framework) *PriorityQueue {
comp := activeQComp
if fwk != nil {
if queueSortFunc := fwk.QueueSortFunc(); queueSortFunc != nil {
Expand All @@ -181,9 +232,9 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock, fwk frame
}

pq := &PriorityQueue{
clock: clock,
clock: options.clock,
stop: stop,
podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second),
podBackoff: NewPodBackoffMap(options.podInitialBackoffDuration, options.podMaxBackoffDuration),
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
nominatedPods: newNominatedPodMap(),
Expand Down
23 changes: 20 additions & 3 deletions pkg/scheduler/internal/queue/scheduling_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
// Pods in and before current scheduling cycle will be put back to activeQueue
// if we were trying to schedule them when we received move request.
func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
q := NewPriorityQueueWithClock(nil, clock.NewFakeClock(time.Now()), nil)
q := NewPriorityQueue(nil, nil, WithClock(clock.NewFakeClock(time.Now())))
totalNum := 10
expectedPods := make([]v1.Pod, 0, totalNum)
for i := 0; i < totalNum; i++ {
Expand Down Expand Up @@ -628,6 +628,23 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
}
}

func TestPriorityQueue_NewWithOptions(t *testing.T) {
q := NewPriorityQueue(
nil,
nil,
WithPodInitialBackoffDuration(2*time.Second),
WithPodMaxBackoffDuration(20*time.Second),
)

if q.podBackoff.initialDuration != 2*time.Second {
t.Errorf("Unexpected pod backoff initial duration. Expected: %v, got: %v", 2*time.Second, q.podBackoff.initialDuration)
}

if q.podBackoff.maxDuration != 20*time.Second {
t.Errorf("Unexpected pod backoff max duration. Expected: %v, got: %v", 2*time.Second, q.podBackoff.maxDuration)
}
}

func TestUnschedulablePodsMap(t *testing.T) {
var pods = []*v1.Pod{
{
Expand Down Expand Up @@ -1208,7 +1225,7 @@ func TestPodTimestamp(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
queue := NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp), nil)
queue := NewPriorityQueue(nil, nil, WithClock(clock.NewFakeClock(timestamp)))
var podInfoList []*framework.PodInfo

for i, op := range test.operations {
Expand Down Expand Up @@ -1375,7 +1392,7 @@ scheduler_pending_pods{queue="unschedulable"} 0
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
resetMetrics()
queue := NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp), nil)
queue := NewPriorityQueue(nil, nil, WithClock(clock.NewFakeClock(timestamp)))
for i, op := range test.operations {
for _, pInfo := range test.operands[i] {
op(queue, pInfo)
Expand Down
Loading

0 comments on commit b8b7c37

Please sign in to comment.