Skip to content

Commit

Permalink
fix stateful set pod recreation and event spam (#123809)
Browse files Browse the repository at this point in the history
* fix pods tracking and internal error checking in statefulset tests

* fix stateful set pod recreation and event spam

- do not emit events when pod reaches terminal phase
- do not try to recreate pod until the old pod has been removed from
  etcd storage

* fix conflict race in statefulset rest update

statefulset controller does less requests per sync now and thus can
reconcile status faster, thus resulting in a higher chance for conflicts
  • Loading branch information
atiratree authored Apr 18, 2024
1 parent 99735cc commit 85d55b6
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 118 deletions.
4 changes: 3 additions & 1 deletion pkg/controller/statefulset/stateful_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func NewStatefulSetController(
recorder),
NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
history.NewHistory(kubeClient, revInformer.Lister()),
recorder,
),
pvcListerSynced: pvcInformer.Informer().HasSynced,
revListerSynced: revInformer.Informer().HasSynced,
Expand Down Expand Up @@ -235,6 +234,9 @@ func (ssc *StatefulSetController) updatePod(logger klog.Logger, old, cur interfa
return
}
logger.V(4).Info("Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta)
if oldPod.Status.Phase != curPod.Status.Phase {
logger.V(4).Info("StatefulSet Pod phase changed", "pod", klog.KObj(curPod), "statefulSet", klog.KObj(set), "podPhase", curPod.Status.Phase)
}
ssc.enqueueStatefulSet(set)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replica set thus
Expand Down
43 changes: 10 additions & 33 deletions pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features"
Expand Down Expand Up @@ -61,16 +60,14 @@ type StatefulSetControlInterface interface {
func NewDefaultStatefulSetControl(
podControl *StatefulPodControl,
statusUpdater StatefulSetStatusUpdaterInterface,
controllerHistory history.Interface,
recorder record.EventRecorder) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
controllerHistory history.Interface) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory}
}

type defaultStatefulSetControl struct {
podControl *StatefulPodControl
statusUpdater StatefulSetStatusUpdaterInterface
controllerHistory history.Interface
recorder record.EventRecorder
}

// UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and
Expand Down Expand Up @@ -367,45 +364,25 @@ func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, current
func (ssc *defaultStatefulSetControl) processReplica(
ctx context.Context,
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
currentSet *apps.StatefulSet,
updateSet *apps.StatefulSet,
monotonic bool,
replicas []*v1.Pod,
i int) (bool, error) {
logger := klog.FromContext(ctx)
// Delete and recreate pods which finished running.
//

// Note that pods with phase Succeeded will also trigger this event. This is
// because final pod phase of evicted or otherwise forcibly stopped pods
// (e.g. terminated on node reboot) is determined by the exit code of the
// container, not by the reason for pod termination. We should restart the pod
// regardless of the exit code.
if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
if isFailed(replicas[i]) {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
"StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
} else {
ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod",
"StatefulSet %s/%s is recreating terminated Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
}
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return true, err
if replicas[i].DeletionTimestamp == nil {
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return true, err
}
}
replicaOrd := i + getStartOrdinal(set)
replicas[i] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name,
replicaOrd)
// New pod should be generated on the next sync after the current pod is removed from etcd.
return true, nil
}
// If we find a Pod that has not been created we create the Pod
if !isCreated(replicas[i]) {
Expand Down Expand Up @@ -637,7 +614,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(

// First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
processReplicaFn := func(i int) (bool, error) {
return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i)
return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i)
}
if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
Expand Down
180 changes: 110 additions & 70 deletions pkg/controller/statefulset/stateful_set_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func setupController(client clientset.Interface) (*fakeObjectManager, *fakeState
om := newFakeObjectManager(informerFactory)
spc := NewStatefulPodControlFromManager(om, &noopRecorder{})
ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
recorder := &noopRecorder{}
ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder)
ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()))

// The informer is not started. The tests here manipulate the local cache (indexers) directly, and there is no waiting
// for client state to sync. In fact, because the client is not updated during tests, informer updates will break tests
Expand Down Expand Up @@ -171,10 +170,11 @@ func TestStatefulSetControl(t *testing.T) {
{ReplacesPods, largeSetFn},
{RecreatesFailedPod, simpleSetFn},
{RecreatesSucceededPod, simpleSetFn},
{RecreatesFailedPodWithDeleteFailure, simpleSetFn},
{RecreatesSucceededPodWithDeleteFailure, simpleSetFn},
{CreatePodFailure, simpleSetFn},
{UpdatePodFailure, simpleSetFn},
{UpdateSetStatusFailure, simpleSetFn},
{PodRecreateDeleteFailure, simpleSetFn},
{NewRevisionDeletePodFailure, simpleSetFn},
{RecreatesPVCForPendingPod, simpleSetFn},
}
Expand Down Expand Up @@ -398,9 +398,10 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
}
}

func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, phase v1.PodPhase) {
func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, terminalPhase v1.PodPhase, testDeletePodFailure bool) {
client := fake.NewSimpleClientset()
om, _, ssc := setupController(client)
expectedNumOfDeleteRequests := 0
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Error(err)
Expand All @@ -415,43 +416,114 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc,
if err := invariants(set, om); err != nil {
t.Error(err)
}
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
t.Errorf("Found unexpected number of delete calls, got %v, expected 1", om.deletePodTracker.requests)
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err)
}
pods[0].Status.Phase = phase
om.podsIndexer.Update(pods[0])

terminalPodOrdinal := -1
for i, pod := range pods {
// Set at least Pending phase to acknowledge the creation of pods
newPhase := v1.PodPending
if i == 0 {
// Set terminal phase for the first pod
newPhase = terminalPhase
terminalPodOrdinal = getOrdinal(pod)
}
pod.Status.Phase = newPhase
if err = om.podsIndexer.Update(pod); err != nil {
t.Error(err)
}
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err)
}
if testDeletePodFailure {
// Expect pod deletion failure
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
expectedNumOfDeleteRequests++
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests)
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err)
}
}

// Expect pod deletion
expectedNumOfDeleteRequests++
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests)
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err)
}

// Expect no additional delete calls and expect pod creation
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
if om.deletePodTracker.requests != expectedNumOfDeleteRequests {
t.Errorf("Found unexpected number of delete calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumOfDeleteRequests)
}
if pods, err = om.podsLister.Pods(set.Namespace).List(selector); err != nil {
t.Error(err)
}
if isCreated(pods[0]) {
recreatedPod := findPodByOrdinal(pods, terminalPodOrdinal)
// new recreated pod should have empty phase
if recreatedPod == nil || isCreated(recreatedPod) {
t.Error("StatefulSet did not recreate failed Pod")
}
expectedNumberOfCreateRequests := 2
if monotonic := !allowsBurst(set); !monotonic {
expectedNumberOfCreateRequests = int(*set.Spec.Replicas + 1)
}
if om.createPodTracker.requests != expectedNumberOfCreateRequests {
t.Errorf("Found unexpected number of create calls, got %v, expected %v", om.deletePodTracker.requests, expectedNumberOfCreateRequests)
}

}

func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodFailed)
recreatesPod(t, set, invariants, v1.PodFailed, false)
}

func RecreatesSucceededPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodSucceeded)
recreatesPod(t, set, invariants, v1.PodSucceeded, false)
}

func RecreatesFailedPodWithDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodFailed, true)
}

func RecreatesSucceededPodWithDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
recreatesPod(t, set, invariants, v1.PodSucceeded, true)
}

func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
om, _, ssc := setupController(client)
om.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)

if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
if err := scaleUpStatefulSetControl(set, ssc, om, invariants); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
}
// Update so set.Status is set for the next scaleUpStatefulSetControl call.
var err error
Expand Down Expand Up @@ -514,8 +586,8 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF
om.podsIndexer.Update(pods[0])

// now it should fail
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
}
}

Expand All @@ -524,8 +596,8 @@ func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants inva
om, ssu, ssc := setupController(client)
ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)

if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
if err := scaleUpStatefulSetControl(set, ssc, om, invariants); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
}
// Update so set.Status is set for the next scaleUpStatefulSetControl call.
var err error
Expand All @@ -551,52 +623,6 @@ func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants inva
}
}

func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
om, _, ssc := setupController(client)

selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Error(err)
}
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
pods[0].Status.Phase = v1.PodFailed
om.podsIndexer.Update(pods[0])
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) {
t.Errorf("StatefulSet failed to %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
if isCreated(pods[0]) {
t.Error("StatefulSet did not recreate failed Pod")
}
}

func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
om, _, ssc := setupController(client)
Expand Down Expand Up @@ -792,7 +818,7 @@ func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
}
*set.Spec.Replicas = 0
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
if err := scaleDownStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) {
if err := scaleDownStatefulSetControl(set, ssc, om, invariants); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
Expand Down Expand Up @@ -834,8 +860,7 @@ func TestStatefulSetControl_getSetRevisions(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := NewStatefulPodControlFromManager(newFakeObjectManager(informerFactory), &noopRecorder{})
ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
recorder := &noopRecorder{}
ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder}
ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions())}

stop := make(chan struct{})
defer close(stop)
Expand Down Expand Up @@ -2501,6 +2526,11 @@ func (om *fakeObjectManager) GetPod(namespace, podName string) (*v1.Pod, error)
}

func (om *fakeObjectManager) UpdatePod(pod *v1.Pod) error {
defer om.updatePodTracker.inc()
if om.updatePodTracker.errorReady() {
defer om.updatePodTracker.reset()
return om.updatePodTracker.getErr()
}
return om.podsIndexer.Update(pod)
}

Expand Down Expand Up @@ -3356,6 +3386,16 @@ func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRev
}

func isOrHasInternalError(err error) bool {
agg, ok := err.(utilerrors.Aggregate)
return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0])
if err == nil {
return false
}
var agg utilerrors.Aggregate
if errors.As(err, &agg) {
for _, e := range agg.Errors() {
if apierrors.IsInternalError(e) {
return true
}
}
}
return apierrors.IsInternalError(err)
}
Loading

0 comments on commit 85d55b6

Please sign in to comment.