diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index c97f933884b0e..dcffd015a3209 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -32,11 +32,21 @@ func defaultPredicates() util.StringSet { // Fit is defined based on the absence of port conflicts. factory.RegisterFitPredicate("PodFitsPorts", algorithm.PodFitsPorts), // Fit is determined by resource availability. - factory.RegisterFitPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)), + factory.RegisterFitPredicateFactory( + "PodFitsResources", + func(args factory.PluginFactoryArgs) algorithm.FitPredicate { + return algorithm.NewResourceFitPredicate(args.NodeInfo) + }, + ), // Fit is determined by non-conflicting disk volumes. factory.RegisterFitPredicate("NoDiskConflict", algorithm.NoDiskConflict), // Fit is determined by node selector query. - factory.RegisterFitPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)), + factory.RegisterFitPredicateFactory( + "MatchNodeSelector", + func(args factory.PluginFactoryArgs) algorithm.FitPredicate { + return algorithm.NewSelectorMatchPredicate(args.NodeInfo) + }, + ), // Fit is determined by the presence of the Host parameter and a string match factory.RegisterFitPredicate("HostName", algorithm.PodFitsHost), ) @@ -47,7 +57,15 @@ func defaultPriorities() util.StringSet { // Prioritize nodes by least requested utilization. factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1), // spreads pods by minimizing the number of pods (belonging to the same service) on the same minion. - factory.RegisterPriorityFunction("ServiceSpreadingPriority", algorithm.NewServiceSpreadPriority(factory.ServiceLister), 1), + factory.RegisterPriorityConfigFactory( + "ServiceSpreadingPriority", + func(args factory.PluginFactoryArgs) algorithm.PriorityConfig { + return algorithm.PriorityConfig{ + Function: algorithm.NewServiceSpreadPriority(args.ServiceLister), + Weight: 1, + } + }, + ), // EqualPriority is a prioritizer function that gives an equal weight of one to all minions factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0), ) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 3ba0544dc2f60..96612771e3015 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -36,12 +36,6 @@ import ( "github.com/golang/glog" ) -var ( - PodLister = &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)} - MinionLister = &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)} - ServiceLister = &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)} -) - // ConfigFactory knows how to fill out a scheduler config with its support functions. type ConfigFactory struct { Client *client.Client @@ -50,7 +44,7 @@ type ConfigFactory struct { // a means to list all scheduled pods PodLister *cache.StoreToPodLister // a means to list all minions - MinionLister *cache.StoreToNodeLister + NodeLister *cache.StoreToNodeLister // a means to list all services ServiceLister *cache.StoreToServiceLister } @@ -60,9 +54,9 @@ func NewConfigFactory(client *client.Client) *ConfigFactory { return &ConfigFactory{ Client: client, PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), - PodLister: PodLister, - MinionLister: MinionLister, - ServiceLister: ServiceLister, + PodLister: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, } } @@ -104,12 +98,17 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler // Creates a scheduler from a set of registered fit predicate keys and priority keys. func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) { glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) - predicateFuncs, err := getFitPredicateFunctions(predicateKeys) + pluginArgs := PluginFactoryArgs{ + PodLister: f.PodLister, + ServiceLister: f.ServiceLister, + NodeLister: f.NodeLister, + } + predicateFuncs, err := getFitPredicateFunctions(predicateKeys, pluginArgs) if err != nil { return nil, err } - priorityConfigs, err := getPriorityFunctionConfigs(priorityKeys) + priorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, pluginArgs) if err != nil { return nil, err } @@ -126,9 +125,9 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe if false { // Disable this code until minions support watches. Note when this code is enabled, // we need to make sure minion ListWatcher has proper FieldSelector. - cache.NewReflector(f.createMinionLW(), &api.Node{}, f.MinionLister.Store, 0).Run() + cache.NewReflector(f.createMinionLW(), &api.Node{}, f.NodeLister.Store, 0).Run() } else { - cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run() + cache.NewPoller(f.pollMinions, 10*time.Second, f.NodeLister.Store).Run() } // Watch and cache all service objects. Scheduler needs to find all pods @@ -149,7 +148,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe } return &scheduler.Config{ - MinionLister: f.MinionLister, + MinionLister: f.NodeLister, Algorithm: algo, Binder: &binder{f.Client}, NextPod: func() *api.Pod { diff --git a/plugin/pkg/scheduler/factory/plugins.go b/plugin/pkg/scheduler/factory/plugins.go index e53c24f0b18cb..4b0461a75c7d3 100644 --- a/plugin/pkg/scheduler/factory/plugins.go +++ b/plugin/pkg/scheduler/factory/plugins.go @@ -28,12 +28,26 @@ import ( "github.com/golang/glog" ) +// PluginFactoryArgs are passed to all plugin factory functions. +type PluginFactoryArgs struct { + algorithm.PodLister + algorithm.ServiceLister + NodeLister algorithm.MinionLister + NodeInfo algorithm.NodeInfo +} + +// A FitPredicateFactory produces a FitPredicate from the given args. +type FitPredicateFactory func(PluginFactoryArgs) algorithm.FitPredicate + +// A PriorityFunctionFactory produces a PriorityConfig from the given args. +type PriorityConfigFactory func(PluginFactoryArgs) algorithm.PriorityConfig + var ( schedulerFactoryMutex sync.Mutex // maps that hold registered algorithm types - fitPredicateMap = make(map[string]algorithm.FitPredicate) - priorityFunctionMap = make(map[string]algorithm.PriorityConfig) + fitPredicateMap = make(map[string]FitPredicateFactory) + priorityFunctionMap = make(map[string]PriorityConfigFactory) algorithmProviderMap = make(map[string]AlgorithmProviderConfig) ) @@ -46,20 +60,26 @@ type AlgorithmProviderConfig struct { PriorityFunctionKeys util.StringSet } -// Registers a fit predicate with the algorithm registry. Returns the name, -// with which the predicate was registered. +// RegisterFitPredicate registers a fit predicate with the algorithm +// registry. Returns the name with which the predicate was registered. func RegisterFitPredicate(name string, predicate algorithm.FitPredicate) string { + return RegisterFitPredicateFactory(name, func(PluginFactoryArgs) algorithm.FitPredicate { return predicate }) +} + +// RegisterFitPredicateFactory registers a fit predicate factory with the +// algorithm registry. Returns the name with which the predicate was registered. +func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string { schedulerFactoryMutex.Lock() defer schedulerFactoryMutex.Unlock() validateAlgorithmNameOrDie(name) - fitPredicateMap[name] = predicate + fitPredicateMap[name] = predicateFactory return name } // Registers a custom fit predicate with the algorithm registry. // Returns the name, with which the predicate was registered. func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string { - var predicate algorithm.FitPredicate + var predicateFactory FitPredicateFactory var ok bool validatePredicateOrDie(policy) @@ -67,20 +87,33 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string { // generate the predicate function, if a custom type is requested if policy.Argument != nil { if policy.Argument.ServiceAffinity != nil { - predicate = algorithm.NewServiceAffinityPredicate(PodLister, ServiceLister, MinionLister, policy.Argument.ServiceAffinity.Labels) + predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate { + return algorithm.NewServiceAffinityPredicate( + args.PodLister, + args.ServiceLister, + args.NodeInfo, + policy.Argument.ServiceAffinity.Labels, + ) + } } else if policy.Argument.LabelsPresence != nil { - predicate = algorithm.NewNodeLabelPredicate(MinionLister, policy.Argument.LabelsPresence.Labels, policy.Argument.LabelsPresence.Presence) + predicateFactory = func(args PluginFactoryArgs) algorithm.FitPredicate { + return algorithm.NewNodeLabelPredicate( + args.NodeInfo, + policy.Argument.LabelsPresence.Labels, + policy.Argument.LabelsPresence.Presence, + ) + } } - } else if predicate, ok = fitPredicateMap[policy.Name]; ok { + } else if predicateFactory, ok = fitPredicateMap[policy.Name]; ok { // checking to see if a pre-defined predicate is requested glog.V(2).Infof("Predicate type %s already registered, reusing.", policy.Name) } - if predicate == nil { + if predicateFactory == nil { glog.Fatalf("Invalid configuration: Predicate type not found for %s", policy.Name) } - return RegisterFitPredicate(policy.Name, predicate) + return RegisterFitPredicateFactory(policy.Name, predicateFactory) } // This check is useful for testing providers. @@ -94,37 +127,59 @@ func IsFitPredicateRegistered(name string) bool { // Registers a priority function with the algorithm registry. Returns the name, // with which the function was registered. func RegisterPriorityFunction(name string, function algorithm.PriorityFunction, weight int) string { + return RegisterPriorityConfigFactory(name, func(PluginFactoryArgs) algorithm.PriorityConfig { + return algorithm.PriorityConfig{Function: function, Weight: weight} + }) +} + +func RegisterPriorityConfigFactory(name string, pcf PriorityConfigFactory) string { schedulerFactoryMutex.Lock() defer schedulerFactoryMutex.Unlock() validateAlgorithmNameOrDie(name) - priorityFunctionMap[name] = algorithm.PriorityConfig{Function: function, Weight: weight} + priorityFunctionMap[name] = pcf return name } // Registers a custom priority function with the algorithm registry. // Returns the name, with which the priority function was registered. func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string { - var priority algorithm.PriorityFunction + var pcf PriorityConfigFactory validatePriorityOrDie(policy) // generate the priority function, if a custom priority is requested if policy.Argument != nil { if policy.Argument.ServiceAntiAffinity != nil { - priority = algorithm.NewServiceAntiAffinityPriority(ServiceLister, policy.Argument.ServiceAntiAffinity.Label) + pcf = func(args PluginFactoryArgs) algorithm.PriorityConfig { + return algorithm.PriorityConfig{ + Function: algorithm.NewServiceAntiAffinityPriority( + args.ServiceLister, + policy.Argument.ServiceAntiAffinity.Label, + ), + Weight: policy.Weight, + } + } } else if policy.Argument.LabelPreference != nil { - priority = algorithm.NewNodeLabelPriority(policy.Argument.LabelPreference.Label, policy.Argument.LabelPreference.Presence) + pcf = func(args PluginFactoryArgs) algorithm.PriorityConfig { + return algorithm.PriorityConfig{ + Function: algorithm.NewNodeLabelPriority( + policy.Argument.LabelPreference.Label, + policy.Argument.LabelPreference.Presence, + ), + Weight: policy.Weight, + } + } } - } else if priorityConfig, ok := priorityFunctionMap[policy.Name]; ok { + } else if _, ok := priorityFunctionMap[policy.Name]; ok { glog.V(2).Infof("Priority type %s already registered, reusing.", policy.Name) - priority = priorityConfig.Function + return policy.Name } - if priority == nil { + if pcf == nil { glog.Fatalf("Invalid configuration: Priority type not found for %s", policy.Name) } - return RegisterPriorityFunction(policy.Name, priority, policy.Weight) + return RegisterPriorityConfigFactory(policy.Name, pcf) } // This check is useful for testing providers. @@ -135,19 +190,6 @@ func IsPriorityFunctionRegistered(name string) bool { return ok } -// Sets the weight of an already registered priority function. -func SetPriorityFunctionWeight(name string, weight int) { - schedulerFactoryMutex.Lock() - defer schedulerFactoryMutex.Unlock() - config, ok := priorityFunctionMap[name] - if !ok { - glog.Errorf("Invalid priority name %s specified - no corresponding function found", name) - return - } - config.Weight = weight - priorityFunctionMap[name] = config -} - // Registers a new algorithm provider with the algorithm registry. This should // be called from the init function in a provider plugin. func RegisterAlgorithmProvider(name string, predicateKeys, priorityKeys util.StringSet) string { @@ -175,32 +217,32 @@ func GetAlgorithmProvider(name string) (*AlgorithmProviderConfig, error) { return &provider, nil } -func getFitPredicateFunctions(names util.StringSet) (map[string]algorithm.FitPredicate, error) { +func getFitPredicateFunctions(names util.StringSet, args PluginFactoryArgs) (map[string]algorithm.FitPredicate, error) { schedulerFactoryMutex.Lock() defer schedulerFactoryMutex.Unlock() predicates := map[string]algorithm.FitPredicate{} for _, name := range names.List() { - function, ok := fitPredicateMap[name] + factory, ok := fitPredicateMap[name] if !ok { return nil, fmt.Errorf("Invalid predicate name %q specified - no corresponding function found", name) } - predicates[name] = function + predicates[name] = factory(args) } return predicates, nil } -func getPriorityFunctionConfigs(names util.StringSet) ([]algorithm.PriorityConfig, error) { +func getPriorityFunctionConfigs(names util.StringSet, args PluginFactoryArgs) ([]algorithm.PriorityConfig, error) { schedulerFactoryMutex.Lock() defer schedulerFactoryMutex.Unlock() configs := []algorithm.PriorityConfig{} for _, name := range names.List() { - config, ok := priorityFunctionMap[name] + factory, ok := priorityFunctionMap[name] if !ok { return nil, fmt.Errorf("Invalid priority name %s specified - no corresponding function found", name) } - configs = append(configs, config) + configs = append(configs, factory(args)) } return configs, nil }