Skip to content

Commit

Permalink
Merge pull request kubernetes#83389 from ahmad-diaa/move-PodPreemptor…
Browse files Browse the repository at this point in the history
…-to-sched

Move PodPreemptor to Scheduler
  • Loading branch information
k8s-ci-robot authored Oct 12, 2019
2 parents 0599ca2 + 6bbc607 commit 08ab271
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 51 deletions.
39 changes: 0 additions & 39 deletions pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ type Config struct {

Algorithm core.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
// the preemptor pod.
PodPreemptor PodPreemptor
// Framework runs scheduler plugins at configured extension points.
Framework framework.Framework

Expand Down Expand Up @@ -119,15 +116,6 @@ type Config struct {
PluginConfig []config.PluginConfig
}

// PodPreemptor has methods needed to delete a pod and to update 'NominatedPod'
// field of the preemptor pod.
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
RemoveNominatedNodeName(pod *v1.Pod) error
}

// Configurator defines I/O, caching, and other functionality needed to
// construct a new scheduler.
type Configurator struct {
Expand Down Expand Up @@ -471,7 +459,6 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
SchedulerCache: c.schedulerCache,
Algorithm: algo,
GetBinder: getBinderFunc(c.client, extenders),
PodPreemptor: &podPreemptor{c.client},
Framework: framework,
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
Expand Down Expand Up @@ -719,29 +706,3 @@ func (b *binder) Bind(binding *v1.Binding) error {
klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
}

type podPreemptor struct {
Client clientset.Interface
}

func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
}

func (p *podPreemptor) DeletePod(pod *v1.Pod) error {
return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
}

func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
podCopy := pod.DeepCopy()
podCopy.Status.NominatedNodeName = nominatedNodeName
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)
return err
}

func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
if len(pod.Status.NominatedNodeName) == 0 {
return nil
}
return p.SetNominatedNodeName(pod, "")
}
50 changes: 44 additions & 6 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,21 @@ const (

// podConditionUpdater updates the condition of a pod based on the passed
// PodCondition
// TODO (ahmad-diaa): Remove type and replace it with scheduler methods
type podConditionUpdater interface {
update(pod *v1.Pod, podCondition *v1.PodCondition) error
}

// PodPreemptor has methods needed to delete a pod and to update 'NominatedPod'
// field of the preemptor pod.
// TODO (ahmad-diaa): Remove type and replace it with scheduler methods
type podPreemptor interface {
getUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
deletePod(pod *v1.Pod) error
setNominatedNodeName(pod *v1.Pod, nominatedNode string) error
removeNominatedNodeName(pod *v1.Pod) error
}

// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
Expand All @@ -74,7 +85,7 @@ type Scheduler struct {
podConditionUpdater podConditionUpdater
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
// the preemptor pod.
PodPreemptor factory.PodPreemptor
podPreemptor podPreemptor
// Framework runs scheduler plugins at configured extension points.
Framework framework.Framework

Expand Down Expand Up @@ -344,6 +355,8 @@ func New(client clientset.Interface,
// Create the scheduler.
sched := NewFromConfig(config)
sched.podConditionUpdater = &podConditionUpdaterImpl{client}
sched.podPreemptor = &podPreemptorImpl{client}

AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer)
return sched, nil
}
Expand Down Expand Up @@ -391,7 +404,6 @@ func NewFromConfig(config *factory.Config) *Scheduler {
SchedulerCache: config.SchedulerCache,
Algorithm: config.Algorithm,
GetBinder: config.GetBinder,
PodPreemptor: config.PodPreemptor,
Framework: config.Framework,
NextPod: config.NextPod,
WaitForCacheSync: config.WaitForCacheSync,
Expand Down Expand Up @@ -434,7 +446,7 @@ func (sched *Scheduler) recordSchedulingFailure(podInfo *framework.PodInfo, err
// If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
// It returns the node name and an error if any.
func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) {
preemptor, err := sched.PodPreemptor.GetUpdatedPod(preemptor)
preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor)
if err != nil {
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
Expand All @@ -454,15 +466,15 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame
sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)

// Make a call to update nominated node name of the pod on the API server.
err = sched.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName)
if err != nil {
klog.Errorf("Error in preemption process. Cannot set 'NominatedPod' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
return "", err
}

for _, victim := range victims {
if err := sched.PodPreemptor.DeletePod(victim); err != nil {
if err := sched.podPreemptor.deletePod(victim); err != nil {
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
}
Expand All @@ -481,7 +493,7 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame
// function of generic_scheduler.go returns the pod itself for removal of
// the 'NominatedPod' field.
for _, p := range nominatedPodsToClear {
rErr := sched.PodPreemptor.RemoveNominatedNodeName(p)
rErr := sched.podPreemptor.removeNominatedNodeName(p)
if rErr != nil {
klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr)
// We do not return as this error is not critical.
Expand Down Expand Up @@ -756,6 +768,32 @@ func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition
return nil
}

type podPreemptorImpl struct {
Client clientset.Interface
}

func (p *podPreemptorImpl) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
}

func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error {
return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
}

func (p *podPreemptorImpl) setNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
podCopy := pod.DeepCopy()
podCopy.Status.NominatedNodeName = nominatedNodeName
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)
return err
}

func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error {
if len(pod.Status.NominatedNodeName) == 0 {
return nil
}
return p.setNominatedNodeName(pod, "")
}

// nodeResourceString returns a string representation of node resources.
func nodeResourceString(n *v1.Node) string {
if n == nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,19 @@ func (fc fakePodConditionUpdater) update(pod *v1.Pod, podCondition *v1.PodCondit

type fakePodPreemptor struct{}

func (fp fakePodPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
func (fp fakePodPreemptor) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
return pod, nil
}

func (fp fakePodPreemptor) DeletePod(pod *v1.Pod) error {
func (fp fakePodPreemptor) deletePod(pod *v1.Pod) error {
return nil
}

func (fp fakePodPreemptor) SetNominatedNodeName(pod *v1.Pod, nomNodeName string) error {
func (fp fakePodPreemptor) setNominatedNodeName(pod *v1.Pod, nomNodeName string) error {
return nil
}

func (fp fakePodPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
func (fp fakePodPreemptor) removeNominatedNodeName(pod *v1.Pod) error {
return nil
}

Expand Down Expand Up @@ -674,7 +674,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
},
Recorder: &events.FakeRecorder{},
podConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
podPreemptor: fakePodPreemptor{},
Framework: emptyFramework,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
}
Expand Down Expand Up @@ -728,7 +728,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
},
Recorder: &events.FakeRecorder{},
podConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
podPreemptor: fakePodPreemptor{},
StopEverything: stop,
Framework: fwk,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
Expand Down

0 comments on commit 08ab271

Please sign in to comment.