Skip to content

Commit

Permalink
Merge pull request kubernetes#2906 from abhgupta/abhgupta-dev
Browse files Browse the repository at this point in the history
Enhancements to scheduler priority functions
  • Loading branch information
davidopp committed Jan 15, 2015
2 parents 84ce5c4 + dbac18a commit 2675cfa
Show file tree
Hide file tree
Showing 13 changed files with 919 additions and 49 deletions.
32 changes: 28 additions & 4 deletions pkg/client/cache/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,41 @@ func (s *StoreToNodeLister) GetNodeInfo(id string) (*api.Node, error) {
return nil, fmt.Errorf("minion '%v' is not in cache", id)
}

// StoreToServiceLister makes a Store have the List method of the client.ServiceInterface
// StoreToServiceLister makes a Store that has the List method of the client.ServiceInterface
// The Store must contain (only) Services.
type StoreToServiceLister struct {
Store
}

func (s *StoreToServiceLister) List() (svcs api.ServiceList, err error) {
func (s *StoreToServiceLister) List() (services api.ServiceList, err error) {
for _, m := range s.Store.List() {
svcs.Items = append(svcs.Items, *(m.(*api.Service)))
services.Items = append(services.Items, *(m.(*api.Service)))
}
return svcs, nil
return services, nil
}

// TODO: Move this back to scheduler as a helper function that takes a Store,
// rather than a method of StoreToServiceLister.
func (s *StoreToServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) {
var selector labels.Selector
var service api.Service

for _, m := range s.Store.List() {
service = *m.(*api.Service)
// consider only services that are in the same namespace as the pod
if service.Namespace != pod.Namespace {
continue
}
selector = labels.Set(service.Spec.Selector).AsSelector()
if selector.Matches(labels.Set(pod.Labels)) {
services = append(services, service)
}
}
if len(services) == 0 {
err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}

return
}

// TODO: add StoreToEndpointsLister for use in kube-proxy.
39 changes: 39 additions & 0 deletions pkg/scheduler/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package scheduler

import (
"fmt"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
Expand Down Expand Up @@ -52,3 +54,40 @@ func (f FakePodLister) List(s labels.Selector) (selected []api.Pod, err error) {
}
return selected, nil
}

// ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler.
type ServiceLister interface {
// Lists all the services
List() (api.ServiceList, error)
// Gets the services for the given pod
GetPodServices(api.Pod) ([]api.Service, error)
}

// FakeServiceLister implements ServiceLister on []api.Service for test purposes.
type FakeServiceLister []api.Service

// FakeServiceLister returns api.ServiceList, the list of all services.
func (f FakeServiceLister) List() (api.ServiceList, error) {
return api.ServiceList{Items: f}, nil
}

// GetPodServices gets the services that have the selector that match the labels on the given pod
func (f FakeServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) {
var selector labels.Selector

for _, service := range f {
// consider only services that are in the same namespace as the pod
if service.Namespace != pod.Namespace {
continue
}
selector = labels.Set(service.Spec.Selector).AsSelector()
if selector.Matches(labels.Set(pod.Labels)) {
services = append(services, service)
}
}
if len(services) == 0 {
err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}

return
}
131 changes: 131 additions & 0 deletions pkg/scheduler/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,137 @@ func PodFitsHost(pod api.Pod, existingPods []api.Pod, node string) (bool, error)
return pod.Spec.Host == node, nil
}

type NodeLabelChecker struct {
info NodeInfo
labels []string
presence bool
}

func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) FitPredicate {
labelChecker := &NodeLabelChecker{
info: info,
labels: labels,
presence: presence,
}
return labelChecker.CheckNodeLabelPresence
}

// CheckNodeLabelPresence checks whether all of the specified labels exists on a minion or not, regardless of their value
// If "presence" is false, then returns false if any of the requested labels matches any of the minion's labels,
// otherwise returns true.
// If "presence" is true, then returns false if any of the requested labels does not match any of the minion's labels,
// otherwise returns true.
//
// Consider the cases where the minions are placed in regions/zones/racks and these are identified by labels
// In some cases, it is required that only minions that are part of ANY of the defined regions/zones/racks be selected
//
// Alternately, eliminating minions that have a certain label, regardless of value, is also useful
// A minion may have a label with "retiring" as key and the date as the value
// and it may be desirable to avoid scheduling new pods on this minion
func (n *NodeLabelChecker) CheckNodeLabelPresence(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
var exists bool
minion, err := n.info.GetNodeInfo(node)
if err != nil {
return false, err
}
minionLabels := labels.Set(minion.Labels)
for _, label := range n.labels {
exists = minionLabels.Has(label)
if (exists && !n.presence) || (!exists && n.presence) {
return false, nil
}
}
return true, nil
}

type ServiceAffinity struct {
podLister PodLister
serviceLister ServiceLister
nodeInfo NodeInfo
labels []string
}

func NewServiceAffinityPredicate(podLister PodLister, serviceLister ServiceLister, nodeInfo NodeInfo, labels []string) FitPredicate {
affinity := &ServiceAffinity{
podLister: podLister,
serviceLister: serviceLister,
nodeInfo: nodeInfo,
labels: labels,
}
return affinity.CheckServiceAffinity
}

// CheckServiceAffinity ensures that only the minions that match the specified labels are considered for scheduling.
// The set of labels to be considered are provided to the struct (ServiceAffinity).
// The pod is checked for the labels and any missing labels are then checked in the minion
// that hosts the service pods (peers) for the given pod.
//
// We add an implicit selector requiring some particular value V for label L to a pod, if:
// - L is listed in the ServiceAffinity object that is passed into the function
// - the pod does not have any NodeSelector for L
// - some other pod from the same service is already scheduled onto a minion that has value V for label L
func (s *ServiceAffinity) CheckServiceAffinity(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
var affinitySelector labels.Selector

// check if the pod being scheduled has the affinity labels specified in its NodeSelector
affinityLabels := map[string]string{}
nodeSelector := labels.Set(pod.Spec.NodeSelector)
labelsExist := true
for _, l := range s.labels {
if nodeSelector.Has(l) {
affinityLabels[l] = nodeSelector.Get(l)
} else {
// the current pod does not specify all the labels, look in the existing service pods
labelsExist = false
}
}

// skip looking at other pods in the service if the current pod defines all the required affinity labels
if !labelsExist {
services, err := s.serviceLister.GetPodServices(pod)
if err == nil {
// just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector)
servicePods, err := s.podLister.List(selector)
if err != nil {
return false, err
}
if len(servicePods) > 0 {
// consider any service pod and fetch the minion its hosted on
otherMinion, err := s.nodeInfo.GetNodeInfo(servicePods[0].Status.Host)
if err != nil {
return false, err
}
for _, l := range s.labels {
// If the pod being scheduled has the label value specified, do not override it
if _, exists := affinityLabels[l]; exists {
continue
}
if labels.Set(otherMinion.Labels).Has(l) {
affinityLabels[l] = labels.Set(otherMinion.Labels).Get(l)
}
}
}
}
}

// if there are no existing pods in the service, consider all minions
if len(affinityLabels) == 0 {
affinitySelector = labels.Everything()
} else {
affinitySelector = labels.Set(affinityLabels).AsSelector()
}

minion, err := s.nodeInfo.GetNodeInfo(node)
if err != nil {
return false, err
}

// check if the minion matches the selector
return affinitySelector.Matches(labels.Set(minion.Labels)), nil
}

func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
existingPorts := getUsedPorts(existingPods...)
wantPorts := getUsedPorts(pod)
Expand Down
Loading

0 comments on commit 2675cfa

Please sign in to comment.