Skip to content

Commit

Permalink
Refactor scheduler to expose predicates to cluster autoscaler
Browse files Browse the repository at this point in the history
  • Loading branch information
fgrzadkowski committed May 24, 2016
1 parent 51e3084 commit 55a1c82
Showing 1 changed file with 62 additions and 37 deletions.
99 changes: 62 additions & 37 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,72 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 0-100", f.HardPodAffinitySymmetricWeight)
}

predicateFuncs, err := f.GetPredicates(predicateKeys)
if err != nil {
return nil, err
}

priorityConfigs, err := f.GetPriorityFunctionConfigs(priorityKeys)
if err != nil {
return nil, err
}

f.Run()

r := rand.New(rand.NewSource(time.Now().UnixNano()))

algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders, r)

podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
clock: realClock{},

defaultDuration: 1 * time.Second,
maxDuration: 60 * time.Second,
}

return &scheduler.Config{
SchedulerCache: f.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
Algorithm: algo,
Binder: &binder{f.Client},
PodConditionUpdater: &podConditionUpdater{f.Client},
NextPod: func() *api.Pod {
return f.getNextPod()
},
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
StopEverything: f.StopEverything,
}, nil
}

func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) {
pluginArgs, err := f.getPluginArgs()
if err != nil {
return nil, err
}

return getPriorityFunctionConfigs(priorityKeys, *pluginArgs)
}

func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) {
pluginArgs, err := f.getPluginArgs()
if err != nil {
return nil, err
}

return getFitPredicateFunctions(predicateKeys, *pluginArgs)
}

func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) {
failureDomainArgs := strings.Split(f.FailureDomains, ",")
for _, failureDomain := range failureDomainArgs {
if errs := utilvalidation.IsQualifiedName(failureDomain); len(errs) != 0 {
return nil, fmt.Errorf("invalid failure domain: %q: %s", failureDomain, strings.Join(errs, ";"))
}
}

pluginArgs := PluginFactoryArgs{
return &PluginFactoryArgs{
PodLister: f.PodLister,
ServiceLister: f.ServiceLister,
ControllerLister: f.ControllerLister,
Expand All @@ -324,17 +382,10 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
PVCInfo: f.PVCLister,
HardPodAffinitySymmetricWeight: f.HardPodAffinitySymmetricWeight,
FailureDomains: sets.NewString(failureDomainArgs...).List(),
}
predicateFuncs, err := getFitPredicateFunctions(predicateKeys, pluginArgs)
if err != nil {
return nil, err
}

priorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, pluginArgs)
if err != nil {
return nil, err
}
}, nil
}

func (f *ConfigFactory) Run() {
// Watch and queue pods that need scheduling.
cache.NewReflector(f.createUnassignedNonTerminatedPodLW(), &api.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything)

Expand Down Expand Up @@ -363,32 +414,6 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.ReplicaSetLister.Store, 0).RunUntil(f.StopEverything)

r := rand.New(rand.NewSource(time.Now().UnixNano()))

algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders, r)

podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
clock: realClock{},

defaultDuration: 1 * time.Second,
maxDuration: 60 * time.Second,
}

return &scheduler.Config{
SchedulerCache: f.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
Algorithm: algo,
Binder: &binder{f.Client},
PodConditionUpdater: &podConditionUpdater{f.Client},
NextPod: func() *api.Pod {
return f.getNextPod()
},
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
StopEverything: f.StopEverything,
}, nil
}

func (f *ConfigFactory) getNextPod() *api.Pod {
Expand Down

0 comments on commit 55a1c82

Please sign in to comment.