Skip to content

Commit

Permalink
Use Patch instead of SSA for Pod Disruption condition
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Oct 20, 2023
1 parent 4ad1d32 commit 951a96d
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 69 deletions.
23 changes: 9 additions & 14 deletions pkg/controller/disruption/disruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
"k8s.io/client-go/discovery"
appsv1informers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
Expand Down Expand Up @@ -74,9 +73,6 @@ const (
// Once the timeout is reached, this controller attempts to set the status
// of the condition to False.
stalePodDisruptionTimeout = 2 * time.Minute

// field manager used to disable the pod failure condition
fieldManager = "DisruptionController"
)

type updater func(context.Context, *policy.PodDisruptionBudget) error
Expand Down Expand Up @@ -770,16 +766,15 @@ func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key
return nil
}

podApply := corev1apply.Pod(pod.Name, pod.Namespace).
WithStatus(corev1apply.PodStatus()).
WithResourceVersion(pod.ResourceVersion)
podApply.Status.WithConditions(corev1apply.PodCondition().
WithType(v1.DisruptionTarget).
WithStatus(v1.ConditionFalse).
WithLastTransitionTime(metav1.Now()),
)

if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
newPod := pod.DeepCopy()
updated := apipod.UpdatePodCondition(&newPod.Status, &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionFalse,
})
if !updated {
return nil
}
if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, newPod, metav1.UpdateOptions{}); err != nil {
return err
}
logger.V(2).Info("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod))
Expand Down
27 changes: 13 additions & 14 deletions pkg/controller/nodelifecycle/scheduler/taint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ import (
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/util/feature"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/helper"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
utilpod "k8s.io/kubernetes/pkg/util/pod"

"k8s.io/klog/v2"
)
Expand All @@ -55,9 +56,6 @@ const (
UpdateWorkerSize = 8
podUpdateChannelSize = 1
retries = 5

// fieldManager used to add pod disruption condition when evicting pods due to NoExecute taint
fieldManager = "TaintManager"
)

type nodeUpdateItem struct {
Expand Down Expand Up @@ -127,16 +125,17 @@ func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name,
if err != nil {
return err
}
podApply := corev1apply.Pod(pod.Name, pod.Namespace).WithStatus(corev1apply.PodStatus())
podApply.Status.WithConditions(corev1apply.PodCondition().
WithType(v1.DisruptionTarget).
WithStatus(v1.ConditionTrue).
WithReason("DeletionByTaintManager").
WithMessage("Taint manager: deleting due to NoExecute taint").
WithLastTransitionTime(metav1.Now()),
)
if _, err := c.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
return err
newStatus := pod.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByTaintManager",
Message: "Taint manager: deleting due to NoExecute taint",
})
if updated {
if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
return err
}
}
}
return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
Expand Down
33 changes: 13 additions & 20 deletions pkg/controller/podgc/gc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,18 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/podgc/metrics"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/eviction"
nodeutil "k8s.io/kubernetes/pkg/util/node"
utilpod "k8s.io/kubernetes/pkg/util/pod"
"k8s.io/kubernetes/pkg/util/taints"
)

Expand All @@ -50,9 +51,6 @@ const (
// quarantineTime defines how long Orphaned GC waits for nodes to show up
// in an informer before issuing a GET call to check if they are truly gone
quarantineTime = 40 * time.Second

// field manager used to add pod failure condition and change the pod phase
fieldManager = "PodGC"
)

type PodGCController struct {
Expand Down Expand Up @@ -249,12 +247,12 @@ func (gcc *PodGCController) gcOrphaned(ctx context.Context, pods []*v1.Pod, node
continue
}
logger.V(2).Info("Found orphaned Pod assigned to the Node, deleting", "pod", klog.KObj(pod), "node", klog.KRef("", pod.Spec.NodeName))
condition := corev1apply.PodCondition().
WithType(v1.DisruptionTarget).
WithStatus(v1.ConditionTrue).
WithReason("DeletionByPodGC").
WithMessage("PodGC: node no longer exists").
WithLastTransitionTime(metav1.Now())
condition := &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByPodGC",
Message: "PodGC: node no longer exists",
}
if err := gcc.markFailedAndDeletePodWithCondition(ctx, pod, condition); err != nil {
utilruntime.HandleError(err)
metrics.DeletingPodsErrorTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonOrphaned).Inc()
Expand Down Expand Up @@ -341,7 +339,7 @@ func (gcc *PodGCController) markFailedAndDeletePod(ctx context.Context, pod *v1.
return gcc.markFailedAndDeletePodWithCondition(ctx, pod, nil)
}

func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *corev1apply.PodConditionApplyConfiguration) error {
func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *v1.PodCondition) error {
logger := klog.FromContext(ctx)
logger.Info("PodGC is force deleting Pod", "pod", klog.KObj(pod))
// Patch the pod to make sure it is transitioned to the Failed phase before deletion.
Expand All @@ -354,17 +352,12 @@ func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Cont
// is orphaned, in which case the pod would remain in the Running phase
// forever as there is no kubelet running to change the phase.
if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed {
podApply := corev1apply.Pod(pod.Name, pod.Namespace).WithStatus(corev1apply.PodStatus())
// we don't need to extract the pod apply configuration and can send
// only phase and the DisruptionTarget condition as PodGC would not
// own other fields. If the DisruptionTarget condition is owned by
// PodGC it means that it is in the Failed phase, so sending the
// condition will not be re-attempted.
podApply.Status.WithPhase(v1.PodFailed)
newStatus := pod.Status.DeepCopy()
newStatus.Phase = v1.PodFailed
if condition != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
podApply.Status.WithConditions(condition)
apipod.UpdatePodCondition(newStatus, condition)
}
if _, err := gcc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
if _, _, _, err := utilpod.PatchPodStatus(ctx, gcc.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
return err
}
}
Expand Down
126 changes: 126 additions & 0 deletions pkg/controller/podgc/gc_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package podgc

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
Expand All @@ -43,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/eviction"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
)

func alwaysReady() bool { return true }
Expand Down Expand Up @@ -671,6 +675,128 @@ func TestGCTerminating(t *testing.T) {
testDeletingPodsMetrics(t, 7, metrics.PodGCReasonTerminatingOutOfService)
}

func TestGCInspectingPatchedPodBeforeDeletion(t *testing.T) {
testCases := []struct {
name string
pod *v1.Pod
expectedPatchedPod *v1.Pod
expectedDeleteAction *clienttesting.DeleteActionImpl
}{
{
name: "orphaned pod should have DisruptionTarget condition added before deletion",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "testPod",
},
Spec: v1.PodSpec{
NodeName: "deletedNode",
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
},
expectedPatchedPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "testPod",
},
Spec: v1.PodSpec{
NodeName: "deletedNode",
},
Status: v1.PodStatus{
Phase: v1.PodFailed,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByPodGC",
Message: "PodGC: node no longer exists",
},
},
},
},
expectedDeleteAction: &clienttesting.DeleteActionImpl{
Name: "testPod",
DeleteOptions: metav1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)},
},
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, true)()

pods := []*v1.Pod{test.pod}

client := setupNewSimpleClient(nil, pods)
gcc, podInformer, _ := NewFromClient(ctx, client, -1)
gcc.quarantineTime = time.Duration(-1)
podInformer.Informer().GetStore().Add(test.pod)
gcc.gc(ctx)

actions := client.Actions()

var patchAction clienttesting.PatchAction
var deleteAction clienttesting.DeleteAction

for _, action := range actions {
if action.GetVerb() == "patch" {
patchAction = action.(clienttesting.PatchAction)
}

if action.GetVerb() == "delete" {
deleteAction = action.(clienttesting.DeleteAction)
}
}

if patchAction != nil && test.expectedPatchedPod == nil {
t.Fatalf("Pod was pactched but expectedPatchedPod is nil")
}
if test.expectedPatchedPod != nil {
patchedPodBytes := patchAction.GetPatch()
originalPod, err := json.Marshal(test.pod)
if err != nil {
t.Fatalf("Failed to marshal original pod %#v: %v", originalPod, err)
}
updated, err := strategicpatch.StrategicMergePatch(originalPod, patchedPodBytes, v1.Pod{})
if err != nil {
t.Fatalf("Failed to apply strategic merge patch %q on pod %#v: %v", patchedPodBytes, originalPod, err)
}

updatedPod := &v1.Pod{}
if err := json.Unmarshal(updated, updatedPod); err != nil {
t.Fatalf("Failed to unmarshal updated pod %q: %v", updated, err)
}

if diff := cmp.Diff(test.expectedPatchedPod, updatedPod, cmpopts.IgnoreFields(v1.Pod{}, "TypeMeta"), cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime")); diff != "" {
t.Fatalf("Unexpected diff on pod (-want,+got):\n%s", diff)
}
}

if deleteAction != nil && test.expectedDeleteAction == nil {
t.Fatalf("Pod was deleted but expectedDeleteAction is nil")
}
if test.expectedDeleteAction != nil {
if diff := cmp.Diff(*test.expectedDeleteAction, deleteAction, cmpopts.IgnoreFields(clienttesting.DeleteActionImpl{}, "ActionImpl")); diff != "" {
t.Fatalf("Unexpected diff on deleteAction (-want,+got):\n%s", diff)
}
}
})
}
}

func verifyDeletedAndPatchedPods(t *testing.T, client *fake.Clientset, wantDeletedPodNames, wantPatchedPodNames sets.String) {
t.Helper()
deletedPodNames := getDeletedPodNames(client)
Expand Down
35 changes: 15 additions & 20 deletions pkg/scheduler/framework/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,22 @@ import (

v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/util/feature"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
)

const (
// fieldManager used to add pod disruption condition to the victim pods
fieldManager = "KubeScheduler"
)

// Candidate represents a nominated node on which the preemptor can be scheduled,
// along with the list of victims that should be evicted for the preemptor to fit the node.
type Candidate interface {
Expand Down Expand Up @@ -362,19 +356,20 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
klog.V(2).InfoS("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(pod), "waitingPod", klog.KObj(victim), "node", c.Name())
} else {
if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
victimPodApply := corev1apply.Pod(victim.Name, victim.Namespace).WithStatus(corev1apply.PodStatus())
victimPodApply.Status.WithConditions(corev1apply.PodCondition().
WithType(v1.DisruptionTarget).
WithStatus(v1.ConditionTrue).
WithReason(v1.PodReasonPreemptionByScheduler).
WithMessage(fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName)).
WithLastTransitionTime(metav1.Now()),
)

if _, err := cs.CoreV1().Pods(victim.Namespace).ApplyStatus(ctx, victimPodApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
errCh.SendErrorWithCancel(err, cancel)
return
condition := &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonPreemptionByScheduler,
Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName),
}
newStatus := pod.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, condition)
if updated {
if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil {
logger.Error(err, "Could not add DisruptionTarget condition due to preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
errCh.SendErrorWithCancel(err, cancel)
return
}
}
}
if err := util.DeletePod(ctx, cs, victim); err != nil {
Expand Down
Loading

0 comments on commit 951a96d

Please sign in to comment.