Skip to content

Commit

Permalink
Merge pull request kubernetes#127444 from dom4ha/fine-grained-qhints
Browse files Browse the repository at this point in the history
Fine grain QueueHints for NodeAffinity plugin
  • Loading branch information
k8s-ci-robot authored Sep 27, 2024
2 parents 5ebd0da + c7db4bb commit 960e398
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 11 deletions.
29 changes: 20 additions & 9 deletions pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (pl *NodeAffinity) EventsToRegister(_ context.Context) ([]framework.Cluster
// isSchedulableAfterNodeChange is invoked whenever a node changed. It checks whether
// that change made a previously unschedulable pod schedulable.
func (pl *NodeAffinity) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
_, modifiedNode, err := util.As[*v1.Node](oldObj, newObj)
originalNode, modifiedNode, err := util.As[*v1.Node](oldObj, newObj)
if err != nil {
return framework.Queue, err
}
Expand All @@ -119,16 +119,27 @@ func (pl *NodeAffinity) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1
if err != nil {
return framework.Queue, err
}
if isMatched {
logger.V(4).Info("node was created or updated, and matches with the pod's NodeAffinity", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
if !isMatched {
logger.V(5).Info("node was created or updated, but the pod's NodeAffinity doesn't match", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.QueueSkip, nil
}
// Since the node was added and it matches the pod's affinity criteria, we can unblock it.
if originalNode == nil {
logger.V(5).Info("node was created, and matches with the pod's NodeAffinity", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.Queue, nil
}

// TODO: also check if the original node meets the pod's requestments once preCheck is completely removed.
// See: https://github.com/kubernetes/kubernetes/issues/110175

logger.V(4).Info("node was created or updated, but it doesn't make this pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.QueueSkip, nil
// At this point we know the operation is update so we can narrow down the criteria to unmatch -> match changes only
// (necessary affinity label was added to the node in this case).
wasMatched, err := requiredNodeAffinity.Match(originalNode)
if err != nil {
return framework.Queue, err
}
if wasMatched {
logger.V(5).Info("node updated, but the pod's NodeAffinity hasn't changed", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.QueueSkip, nil
}
logger.V(5).Info("node was updated and the pod's NodeAffinity changed to matched", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
return framework.Queue, nil
}

// PreFilter builds and writes cycle state used by Filter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,13 @@ func Test_isSchedulableAfterNodeChange(t *testing.T) {
newObj: st.MakeNode().Label("foo", "bar").Obj(),
expectedHint: framework.Queue,
},
"skip-unrelated-change-that-keeps-pod-schedulable": {
args: &config.NodeAffinityArgs{},
pod: podWithNodeAffinity.Obj(),
oldObj: st.MakeNode().Label("foo", "bar").Obj(),
newObj: st.MakeNode().Capacity(nil).Label("foo", "bar").Obj(),
expectedHint: framework.QueueSkip,
},
"skip-queue-on-add-scheduler-enforced-node-affinity": {
args: &config.NodeAffinityArgs{
AddedAffinity: &v1.NodeAffinity{
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ const (

type ClusterEventWithHint struct {
Event ClusterEvent
// QueueingHintFn is executed for the plugin rejected by this plugin when the above Event happens,
// QueueingHintFn is executed for the Pod rejected by this plugin when the above Event happens,
// and filters out events to reduce useless retry of Pod's scheduling.
// It's an optional field. If not set,
// the scheduling of Pods will be always retried with backoff when this Event happens.
Expand Down
43 changes: 42 additions & 1 deletion test/integration/scheduler/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,48 @@ func TestCoreResourceEnqueue(t *testing.T) {
// It causes pod1 to be requeued.
// It causes pod2 not to be requeued.
if _, err := testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, st.MakeNode().Name("fake-node1").Label("group", "b").Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to remove taints off the node: %w", err)
return fmt.Errorf("failed to update the pod: %w", err)
}
return nil
},
wantRequeuedPods: sets.New("pod1"),
},
{
name: "Pod rejected by the NodeAffinity plugin is not requeued when an updated Node haven't changed the 'match' verdict",
initialNodes: []*v1.Node{
st.MakeNode().Name("node1").Label("group", "a").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
st.MakeNode().Name("node2").Label("group", "b").Obj()},
pods: []*v1.Pod{
// - The initial pod would be accepted by the NodeAffinity plugin for node1, but will be blocked by the NodeResources plugin.
// - The pod will be blocked by the NodeAffinity plugin for node2, therefore we know NodeAffinity will be queried for qhint for both testing nodes.
st.MakePod().Name("pod1").NodeAffinityIn("group", []string{"a"}, st.NodeSelectorTypeMatchExpressions).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
// Trigger a NodeUpdate event to add new label.
// It won't cause pod to be requeued, because there was a match already before the update, meaning this plugin wasn't blocking the scheduling.
if _, err := testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, st.MakeNode().Name("node1").Label("group", "a").Label("node", "fake-node").Obj(), metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update the pod: %w", err)
}
return nil
},
wantRequeuedPods: sets.Set[string]{},
enableSchedulingQueueHint: []bool{true},
},
{
name: "Pod rejected by the NodeAffinity plugin is requeued when a Node is added",
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node1").Label("group", "a").Obj()},
pods: []*v1.Pod{
// - Pod1 will be rejected by the NodeAffinity plugin.
st.MakePod().Name("pod1").NodeAffinityIn("group", []string{"b"}, st.NodeSelectorTypeMatchExpressions).Container("image").Obj(),
// - Pod2 will be rejected by the NodeAffinity plugin.
st.MakePod().Name("pod2").NodeAffinityIn("group", []string{"c"}, st.NodeSelectorTypeMatchExpressions).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
// Trigger a NodeAdd event with the awaited label.
// It causes pod1 to be requeued.
// It causes pod2 not to be requeued.
if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, st.MakeNode().Name("fake-node2").Label("group", "b").Obj(), metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to update the pod: %w", err)
}
return nil
},
Expand Down

0 comments on commit 960e398

Please sign in to comment.