Skip to content

Commit

Permalink
Merge pull request kubernetes#2825 from mikedanese/scheduler-factory-…
Browse files Browse the repository at this point in the history
…plugins

Scheduler plugin configuration
  • Loading branch information
bgrant0607 committed Dec 18, 2014
2 parents fd7b4d4 + 4850bdb commit 19f0b8b
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 101 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func startComponents(manifestURL string) (apiServerURL string) {

// Scheduler
schedulerConfigFactory := factory.NewConfigFactory(cl)
schedulerConfig, err := schedulerConfigFactory.Create(nil, nil)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
glog.Fatal("Couldn't create scheduler config: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/standalone/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"

"github.com/golang/glog"
Expand Down Expand Up @@ -105,7 +106,7 @@ func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, p
func RunScheduler(cl *client.Client) {
// Scheduler
schedulerConfigFactory := factory.NewConfigFactory(cl)
schedulerConfig, err := schedulerConfigFactory.Create(nil, nil)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
glog.Fatal("Couldn't create scheduler config: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion plugin/cmd/kube-scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
"github.com/golang/glog"
)
Expand Down Expand Up @@ -61,7 +62,7 @@ func main() {
go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil)

configFactory := factory.NewConfigFactory(kubeClient)
config, err := configFactory.Create(nil, nil)
config, err := configFactory.Create()
if err != nil {
glog.Fatalf("Failed to create scheduler configuration: %v", err)
}
Expand Down
52 changes: 52 additions & 0 deletions plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// This is the default algorithm provider for the scheduler.
package defaults

import (
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
)

func init() {
factory.RegisterAlgorithmProvider(factory.DefaultProvider, defaultPredicates(), defaultPriorities())
}

func defaultPredicates() util.StringSet {
return util.NewStringSet(
// Fit is defined based on the absence of port conflicts.
factory.RegisterFitPredicate("PodFitsPorts", algorithm.PodFitsPorts),
// Fit is determined by resource availability
factory.RegisterFitPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)),
// Fit is determined by non-conflicting disk volumes
factory.RegisterFitPredicate("NoDiskConflict", algorithm.NoDiskConflict),
// Fit is determined by node selector query
factory.RegisterFitPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)),
)
}

func defaultPriorities() util.StringSet {
return util.NewStringSet(
// Prioritize nodes by least requested utilization.
factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1),
// spreads pods by minimizing the number of pods on the same minion with the same labels.
factory.RegisterPriorityFunction("SpreadingPriority", algorithm.CalculateSpreadPriority, 1),
// EqualPriority is a prioritizer function that gives an equal weight of one to all minions
factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0),
)
}
22 changes: 22 additions & 0 deletions plugin/pkg/scheduler/algorithmprovider/plugins.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// This package is used to register algorithm provider plugins.
package algorithmprovider

import (
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults"
)
65 changes: 65 additions & 0 deletions plugin/pkg/scheduler/algorithmprovider/plugins_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package algorithmprovider

import (
"testing"

"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
)

var (
algorithmProviderNames = []string{
factory.DefaultProvider,
}
)

func TestDefaultConfigExists(t *testing.T) {
p, err := factory.GetAlgorithmProvider(factory.DefaultProvider)
if err != nil {
t.Errorf("error retrivieving default provider: %v", err)
}
if p == nil {
t.Error("algorithm provider config should not be nil")
}
if len(p.FitPredicateKeys) == 0 {
t.Error("default algorithm provider shouldn't have 0 fit predicates")
}
}

func TestAlgorithmProviders(t *testing.T) {
for _, pn := range algorithmProviderNames {
p, err := factory.GetAlgorithmProvider(pn)
if err != nil {
t.Errorf("error retrivieving '%s' provider: %v", pn, err)
break
}
if len(p.PriorityFunctionKeys) == 0 {
t.Error("%s algorithm provider shouldn't have 0 priority functions", pn)
}
for _, pf := range p.PriorityFunctionKeys.List() {
if !factory.IsPriorityFunctionRegistered(pf) {
t.Errorf("priority function %s is not registerd but is used in the %s algorithm provider", pf, pn)
}
}
for _, fp := range p.FitPredicateKeys.List() {
if !factory.IsFitPredicateRegistered(fp) {
t.Errorf("fit predicate %s is not registerd but is used in the %s algorithm provider", fp, pn)
}
}
}
}
133 changes: 36 additions & 97 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ import (
"github.com/golang/glog"
)

var (
PodLister = &storeToPodLister{cache.NewStore()}
MinionLister = &storeToNodeLister{cache.NewStore()}
)

// ConfigFactory knows how to fill out a scheduler config with its support functions.
type ConfigFactory struct {
Client *client.Client
Expand All @@ -46,151 +51,85 @@ type ConfigFactory struct {
PodLister *storeToPodLister
// a means to list all minions
MinionLister *storeToNodeLister
// map of strings to predicate functions to be used
// to filter the minions for scheduling pods
PredicateMap map[string]algorithm.FitPredicate
// map of strings to priority config to be used
// to prioritize the filtered minions for scheduling pods
PriorityMap map[string]algorithm.PriorityConfig
}

// NewConfigFactory initializes the factory.
func NewConfigFactory(client *client.Client) *ConfigFactory {
// initialize the factory struct
factory := &ConfigFactory{
return &ConfigFactory{
Client: client,
PodQueue: cache.NewFIFO(),
PodLister: &storeToPodLister{cache.NewStore()},
MinionLister: &storeToNodeLister{cache.NewStore()},
PredicateMap: make(map[string]algorithm.FitPredicate),
PriorityMap: make(map[string]algorithm.PriorityConfig),
PodLister: PodLister,
MinionLister: MinionLister,
}
}

factory.addDefaultPredicates()
factory.addDefaultPriorities()

return factory
// Create creates a scheduler with the default algorithm provider.
func (f *ConfigFactory) Create() (*scheduler.Config, error) {
return f.CreateFromProvider(DefaultProvider)
}

// Create creates a scheduler and all support functions.
func (factory *ConfigFactory) Create(predicateKeys, priorityKeys []string) (*scheduler.Config, error) {
if predicateKeys == nil {
glog.V(2).Infof("Custom predicates list not provided, using default predicates")
predicateKeys = []string{"PodFitsPorts", "PodFitsResources", "NoDiskConflict", "MatchNodeSelector"}
}
predicateFuncs, err := factory.getPredicateFunctions(predicateKeys)
// CreateFromProvider creates a scheduler from the name of a registered algorithm provider.
func (f *ConfigFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) {
glog.V(2).Infof("creating scheduler from algorithm provider '%v'", providerName)
provider, err := GetAlgorithmProvider(providerName)
if err != nil {
return nil, err
}

if priorityKeys == nil {
glog.V(2).Infof("Custom priority list not provided, using default priority: LeastRequestedPriority")
priorityKeys = []string{"LeastRequestedPriority"}
return f.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys)
}

// CreateFromKeys creates a scheduler from a set of registered fit predicate keys and priority keys.
func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) {
glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys)
predicateFuncs, err := getFitPredicateFunctions(predicateKeys)
if err != nil {
return nil, err
}
priorityConfigs, err := factory.getPriorityConfigs(priorityKeys)

priorityConfigs, err := getPriorityFunctionConfigs(priorityKeys)
if err != nil {
return nil, err
}

// Watch and queue pods that need scheduling.
cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, factory.PodQueue).Run()
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue).Run()

// Watch and cache all running pods. Scheduler needs to find all pods
// so it knows where it's safe to place a pod. Cache this locally.
cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, factory.PodLister.Store).Run()
cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.PodLister.Store).Run()

// Watch minions.
// Minions may be listed frequently, so provide a local up-to-date cache.
if false {
// Disable this code until minions support watches.
cache.NewReflector(factory.createMinionLW(), &api.Node{}, factory.MinionLister.Store).Run()
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.MinionLister.Store).Run()
} else {
cache.NewPoller(factory.pollMinions, 10*time.Second, factory.MinionLister.Store).Run()
cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run()
}

r := rand.New(rand.NewSource(time.Now().UnixNano()))

algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, factory.PodLister, r)
algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, f.PodLister, r)

podBackoff := podBackoff{
perPodBackoff: map[string]*backoffEntry{},
clock: realClock{},
}

return &scheduler.Config{
MinionLister: factory.MinionLister,
MinionLister: f.MinionLister,
Algorithm: algo,
Binder: &binder{factory.Client},
Binder: &binder{f.Client},
NextPod: func() *api.Pod {
pod := factory.PodQueue.Pop().(*api.Pod)
pod := f.PodQueue.Pop().(*api.Pod)
glog.V(2).Infof("glog.v2 --> About to try and schedule pod %v", pod.Name)
return pod
},
Error: factory.makeDefaultErrorFunc(&podBackoff, factory.PodQueue),
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
}, nil
}

func (factory *ConfigFactory) getPredicateFunctions(keys []string) ([]algorithm.FitPredicate, error) {
predicates := []algorithm.FitPredicate{}
for _, key := range keys {
function, ok := factory.PredicateMap[key]
if !ok {
return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key)
}
predicates = append(predicates, function)
}
return predicates, nil
}

func (factory *ConfigFactory) getPriorityConfigs(keys []string) ([]algorithm.PriorityConfig, error) {
configs := []algorithm.PriorityConfig{}
for _, key := range keys {
config, ok := factory.PriorityMap[key]
if !ok {
return nil, fmt.Errorf("Invalid priority key %s specified - no corresponding function found", key)
}
configs = append(configs, config)
}
return configs, nil
}

func (factory *ConfigFactory) addDefaultPredicates() {
// Fit is defined based on the absence of port conflicts.
factory.AddPredicate("PodFitsPorts", algorithm.PodFitsPorts)
// Fit is determined by resource availability
factory.AddPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister))
// Fit is determined by non-conflicting disk volumes
factory.AddPredicate("NoDiskConflict", algorithm.NoDiskConflict)
// Fit is determined by node selector query
factory.AddPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister))
}

func (factory *ConfigFactory) AddPredicate(key string, function algorithm.FitPredicate) {
factory.PredicateMap[key] = function
}

func (factory *ConfigFactory) addDefaultPriorities() {
// Prioritize nodes by least requested utilization.
factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1)
// spreads pods by minimizing the number of pods on the same minion with the same labels.
factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority, 1)
// EqualPriority is a prioritizer function that gives an equal weight of one to all minions
factory.AddPriority("EqualPriority", algorithm.EqualPriority, 0)
}

func (factory *ConfigFactory) AddPriority(key string, function algorithm.PriorityFunction, weight int) {
factory.PriorityMap[key] = algorithm.PriorityConfig{Function: function, Weight: weight}
}

func (factory *ConfigFactory) SetWeight(key string, weight int) {
config, ok := factory.PriorityMap[key]
if !ok {
glog.Errorf("Invalid priority key %s specified - no corresponding function found", key)
return
}
config.Weight = weight
}

type listWatch struct {
client *client.Client
fieldSelector labels.Selector
Expand Down
2 changes: 1 addition & 1 deletion plugin/pkg/scheduler/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestCreate(t *testing.T) {
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
factory := NewConfigFactory(client)
factory.Create(nil, nil)
factory.Create()
}

func TestCreateLists(t *testing.T) {
Expand Down
Loading

0 comments on commit 19f0b8b

Please sign in to comment.