Skip to content

Commit

Permalink
Implementing PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Abhishek Gupta committed Jan 13, 2015
1 parent 3f722a3 commit 9e75a05
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 98 deletions.
18 changes: 10 additions & 8 deletions pkg/scheduler/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func (f FakePodLister) List(s labels.Selector) (selected []api.Pod, err error) {
type ServiceLister interface {
// Lists all the services
ListServices() (api.ServiceList, error)
// Gets the service for the given pod
GetPodService(api.Pod) (api.Service, error)
// Gets the services for the given pod
GetPodServices(api.Pod) ([]api.Service, error)
}

// FakeServiceLister implements ServiceLister on []api.Service for test purposes.
Expand All @@ -71,10 +71,8 @@ func (f FakeServiceLister) ListServices() (api.ServiceList, error) {
return api.ServiceList{Items: f}, nil
}

// GetPodService gets the service that has the selector that can match the labels on the given pod
// We are assuming a single service per pod.
// In case of multiple services per pod, the first service found is returned
func (f FakeServiceLister) GetPodService(pod api.Pod) (service api.Service, err error) {
// GetPodServices gets the services that have the selector that match the labels on the given pod
func (f FakeServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) {
var selector labels.Selector

for _, service := range f {
Expand All @@ -84,8 +82,12 @@ func (f FakeServiceLister) GetPodService(pod api.Pod) (service api.Service, err
}
selector = labels.Set(service.Spec.Selector).AsSelector()
if selector.Matches(labels.Set(pod.Labels)) {
return service, nil
services = append(services, service)
}
}
return service, fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
if len(services) == 0 {
err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}

return
}
27 changes: 20 additions & 7 deletions pkg/scheduler/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,12 @@ func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) FitPre
return labelChecker.CheckNodeLabelPresence
}

// CheckNodeLabelPresence checks whether a particular label exists on a minion or not, regardless of its value
// CheckNodeLabelPresence checks whether all of the specified labels exists on a minion or not, regardless of their value
// If "presence" is false, then returns false if any of the requested labels matches any of the minion's labels,
// otherwise returns true.
// If "presence" is true, then returns false if any of the requested labels does not match any of the minion's labels,
// otherwise returns true.
//
// Consider the cases where the minions are placed in regions/zones/racks and these are identified by labels
// In some cases, it is required that only minions that are part of ANY of the defined regions/zones/racks be selected
//
Expand All @@ -195,8 +200,9 @@ func (n *NodeLabelChecker) CheckNodeLabelPresence(pod api.Pod, existingPods []ap
if err != nil {
return false, err
}
minionLabels := labels.Set(minion.Labels)
for _, label := range n.labels {
exists = labels.Set(minion.Labels).Has(label)
exists = minionLabels.Has(label)
if (exists && !n.presence) || (!exists && n.presence) {
return false, nil
}
Expand All @@ -221,15 +227,20 @@ func NewServiceAffinityPredicate(podLister PodLister, serviceLister ServiceListe
return affinity.CheckServiceAffinity
}

// CheckServiceAffinity ensures that only the minions that match the specified labels are considered for scheduling.
// The set of labels to be considered are provided to the struct (ServiceAffinity).
// The pod is checked for the labels and any missing labels are then checked in the minion
// that hosts the service pods (peers) for the given pod.
func (s *ServiceAffinity) CheckServiceAffinity(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
var affinitySelector labels.Selector

// check if the pod being scheduled has the affinity labels specified
// check if the pod being scheduled has the affinity labels specified in its NodeSelector
affinityLabels := map[string]string{}
nodeSelector := labels.Set(pod.Spec.NodeSelector)
labelsExist := true
for _, l := range s.labels {
if labels.Set(pod.Labels).Has(l) {
affinityLabels[l] = labels.Set(pod.Labels).Get(l)
if nodeSelector.Has(l) {
affinityLabels[l] = nodeSelector.Get(l)
} else {
// the current pod does not specify all the labels, look in the existing service pods
labelsExist = false
Expand All @@ -238,9 +249,11 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod api.Pod, existingPods []api.P

// skip looking at other pods in the service if the current pod defines all the required affinity labels
if !labelsExist {
service, err := s.serviceLister.GetPodService(pod)
services, err := s.serviceLister.GetPodServices(pod)
if err == nil {
selector := labels.SelectorFromSet(service.Spec.Selector)
// just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector)
servicePods, err := s.podLister.ListPods(selector)
if err != nil {
return false, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,14 +502,14 @@ func TestServiceAffinity(t *testing.T) {
test: "nothing scheduled",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"region": "r1"}}},
pod: api.Pod{Spec: api.PodSpec{NodeSelector: map[string]string{"region": "r1"}}},
node: "machine1",
fits: true,
labels: []string{"region"},
test: "pod with region label match",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"region": "r2"}}},
pod: api.Pod{Spec: api.PodSpec{NodeSelector: map[string]string{"region": "r2"}}},
node: "machine1",
fits: false,
labels: []string{"region"},
Expand Down
7 changes: 3 additions & 4 deletions pkg/scheduler/priorities.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,16 @@ func NewNodeLabelPriority(label string, presence bool) PriorityFunction {
return labelPrioritizer.CalculateNodeLabelPriority
}

// CalculateNodeLabelPriority checks whether a particular label exists on a minion or not, regardless of its value
// Consider the cases where the minions are places in regions/zones/racks and these are identified by labels
// In some cases, it is required that only minions that are part of ANY of the defined regions/zones/racks be selected
// CalculateNodeLabelPriority checks whether a particular label exists on a minion or not, regardless of its value.
// If presence is true, prioritizes minions that have the specified label, regardless of value.
// If presence is false, prioritizes minions that do not have the specified label.
func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) {
var score int
minions, err := minionLister.List()
if err != nil {
return nil, err
}

// find the zones that the minions belong to
labeledMinions := map[string]bool{}
for _, minion := range minions.Items {
exists := labels.Set(minion.Labels).Has(n.label)
Expand Down
46 changes: 23 additions & 23 deletions pkg/scheduler/spreading.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ func (s *ServiceSpread) CalculateSpreadPriority(pod api.Pod, podLister PodLister
var pods []api.Pod
var err error

service, err := s.serviceLister.GetPodService(pod)
services, err := s.serviceLister.GetPodServices(pod)
if err == nil {
selector := labels.SelectorFromSet(service.Spec.Selector)
// just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector)
pods, err = podLister.ListPods(selector)
if err != nil {
return nil, err
Expand Down Expand Up @@ -94,13 +96,13 @@ func NewServiceAntiAffinityPriority(serviceLister ServiceLister, label string) P
}

func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) {
var service api.Service
var pods []api.Pod
var err error

service, err = s.serviceLister.GetPodService(pod)
services, err := s.serviceLister.GetPodServices(pod)
if err == nil {
selector := labels.SelectorFromSet(service.Spec.Selector)
// just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector)
pods, err = podLister.ListPods(selector)
if err != nil {
return nil, err
Expand All @@ -112,43 +114,41 @@ func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod api.Pod, podList
return nil, err
}

// find the zones that the minions belong to
openMinions := []string{}
zonedMinions := map[string]string{}
// separate out the minions that have the label from the ones that don't
otherMinions := []string{}
labeledMinions := map[string]string{}
for _, minion := range minions.Items {
if labels.Set(minion.Labels).Has(s.label) {
zone := labels.Set(minion.Labels).Get(s.label)
zonedMinions[minion.Name] = zone
label := labels.Set(minion.Labels).Get(s.label)
labeledMinions[minion.Name] = label
} else {
openMinions = append(openMinions, minion.Name)
otherMinions = append(otherMinions, minion.Name)
}
}

podCounts := map[string]int{}
numServicePods := len(pods)
if numServicePods > 0 {
for _, pod := range pods {
zone, exists := zonedMinions[pod.Status.Host]
if !exists {
continue
}
podCounts[zone]++
for _, pod := range pods {
zone, exists := labeledMinions[pod.Status.Host]
if !exists {
continue
}
podCounts[zone]++
}

numServicePods := len(pods)
result := []HostPriority{}
//score int - scale of 0-10
// 0 being the lowest priority and 10 being the highest
for minion := range zonedMinions {
for minion := range labeledMinions {
// initializing to the default/max minion score of 10
fScore := float32(10)
if numServicePods > 0 {
fScore = 10 * (float32(numServicePods-podCounts[zonedMinions[minion]]) / float32(numServicePods))
fScore = 10 * (float32(numServicePods-podCounts[labeledMinions[minion]]) / float32(numServicePods))
}
result = append(result, HostPriority{host: minion, score: int(fScore)})
}
// add the open minions with a score of 0
for _, minion := range openMinions {
for _, minion := range otherMinions {
result = append(result, HostPriority{host: minion, score: 0})
}

Expand Down
24 changes: 17 additions & 7 deletions plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,34 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
)

const Provider string = "AffinityProvider"
const AffinityProvider string = "AffinityProvider"

func init() {
factory.RegisterAlgorithmProvider(Provider, defaultPredicates(), defaultPriorities())
factory.RegisterAlgorithmProvider(AffinityProvider, affinityPredicates(), affinityPriorities())
}

func defaultPredicates() util.StringSet {
func affinityPredicates() util.StringSet {
return util.NewStringSet(
// Fit is defined based on whether the minion has the specified label values as the pod being scheduled
// Alternately, if the pod does not specify any/all labels, the other pods in the service are looked at
"HostName",
"MatchNodeSelector",
"PodFitsPorts",
"PodFitsResources",
"NoDiskConflict",
// Ensures that all pods within the same service are hosted on minions within the same region as defined by the "region" label
factory.RegisterFitPredicate("ServiceAffinity", algorithm.NewServiceAffinityPredicate(factory.PodLister, factory.ServiceLister, factory.MinionLister, []string{"region"})),
// Fit is defined based on the presence/absence of the "region" label on a minion, regardless of value.
factory.RegisterFitPredicate("NodeLabelPredicate", algorithm.NewNodeLabelPredicate(factory.MinionLister, []string{"region"}, true)),
)
}

func defaultPriorities() util.StringSet {
func affinityPriorities() util.StringSet {
return util.NewStringSet(
"LeastRequestedPriority",
"ServiceSpreadingPriority",
// spreads pods belonging to the same service across minions in different zones
// region and zone can be nested infrastructure topology levels and defined by labels on minions
factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 1),
factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 2),
// Prioritize nodes based on the presence/absence of a label on a minion, regardless of value.
factory.RegisterPriorityFunction("NodeLabelPriority", algorithm.NewNodeLabelPriority("zone", true), 1),
)
}

This file was deleted.

1 change: 1 addition & 0 deletions plugin/pkg/scheduler/algorithmprovider/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ limitations under the License.
package algorithmprovider

import (
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/affinity"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults"
)
2 changes: 2 additions & 0 deletions plugin/pkg/scheduler/algorithmprovider/plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package algorithmprovider
import (
"testing"

"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/affinity"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
)

var (
algorithmProviderNames = []string{
factory.DefaultProvider,
affinity.AffinityProvider,
}
)

Expand Down
11 changes: 8 additions & 3 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,9 @@ func (s *storeToServiceLister) ListServices() (services api.ServiceList, err err
return services, nil
}

func (s *storeToServiceLister) GetPodService(pod api.Pod) (service api.Service, err error) {
func (s *storeToServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) {
var selector labels.Selector
var service api.Service

for _, m := range s.List() {
service = *m.(*api.Service)
Expand All @@ -249,10 +250,14 @@ func (s *storeToServiceLister) GetPodService(pod api.Pod) (service api.Service,
}
selector = labels.Set(service.Spec.Selector).AsSelector()
if selector.Matches(labels.Set(pod.Labels)) {
return service, nil
services = append(services, service)
}
}
return service, fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
if len(services) == 0 {
err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}

return
}

// Len returns the number of items in the node list.
Expand Down

0 comments on commit 9e75a05

Please sign in to comment.