diff --git a/cmd/kube-controller-manager/app/apps.go b/cmd/kube-controller-manager/app/apps.go index 99114cc243182..3b074a6acdc29 100644 --- a/cmd/kube-controller-manager/app/apps.go +++ b/cmd/kube-controller-manager/app/apps.go @@ -45,7 +45,7 @@ func startDaemonSetController(ctx context.Context, controllerContext ControllerC if err != nil { return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err) } - go dsc.Run(int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Done()) + go dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs)) return nil, true, nil } @@ -56,7 +56,7 @@ func startStatefulSetController(ctx context.Context, controllerContext Controlle controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), controllerContext.InformerFactory.Apps().V1().ControllerRevisions(), controllerContext.ClientBuilder.ClientOrDie("statefulset-controller"), - ).Run(int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Done()) + ).Run(ctx, int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs)) return nil, true, nil } @@ -66,7 +66,7 @@ func startReplicaSetController(ctx context.Context, controllerContext Controller controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, - ).Run(int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Done()) + ).Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs)) return nil, true, nil } @@ -80,6 +80,6 @@ func startDeploymentController(ctx context.Context, controllerContext Controller if err != nil { return nil, true, fmt.Errorf("error creating Deployment controller: %v", err) } - go dc.Run(int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Done()) + go dc.Run(ctx, int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs)) return nil, true, nil } diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index cd217818c4ae1..9b8d4ea429e57 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -399,7 +399,7 @@ func startReplicationController(ctx context.Context, controllerContext Controlle controllerContext.InformerFactory.Core().V1().ReplicationControllers(), controllerContext.ClientBuilder.ClientOrDie("replication-controller"), replicationcontroller.BurstReplicas, - ).Run(int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Done()) + ).Run(ctx, int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs)) return nil, true, nil } diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index b21281dcf1e0b..3fc94626805bb 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "context" "encoding/json" "fmt" "sync" @@ -38,13 +39,13 @@ type BaseControllerRefManager struct { canAdoptErr error canAdoptOnce sync.Once - CanAdoptFunc func() error + CanAdoptFunc func(ctx context.Context) error } -func (m *BaseControllerRefManager) CanAdopt() error { +func (m *BaseControllerRefManager) CanAdopt(ctx context.Context) error { m.canAdoptOnce.Do(func() { if m.CanAdoptFunc != nil { - m.canAdoptErr = m.CanAdoptFunc() + m.canAdoptErr = m.CanAdoptFunc(ctx) } }) return m.canAdoptErr @@ -65,7 +66,7 @@ func (m *BaseControllerRefManager) CanAdopt() error { // own the object. // // No reconciliation will be attempted if the controller is being deleted. -func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) { +func (m *BaseControllerRefManager) ClaimObject(ctx context.Context, obj metav1.Object, match func(metav1.Object) bool, adopt func(context.Context, metav1.Object) error, release func(metav1.Object) error) (bool, error) { controllerRef := metav1.GetControllerOfNoCopy(obj) if controllerRef != nil { if controllerRef.UID != m.Controller.GetUID() { @@ -107,7 +108,7 @@ func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(met return false, nil } // Selector matches. Try to adopt. - if err := adopt(obj); err != nil { + if err := adopt(ctx, obj); err != nil { // If the pod no longer exists, ignore the error. if errors.IsNotFound(err) { return false, nil @@ -143,7 +144,7 @@ func NewPodControllerRefManager( controller metav1.Object, selector labels.Selector, controllerKind schema.GroupVersionKind, - canAdopt func() error, + canAdopt func(ctx context.Context) error, finalizers ...string, ) *PodControllerRefManager { return &PodControllerRefManager{ @@ -173,7 +174,7 @@ func NewPodControllerRefManager( // // If the error is nil, either the reconciliation succeeded, or no // reconciliation was necessary. The list of Pods that you now own is returned. -func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) { +func (m *PodControllerRefManager) ClaimPods(ctx context.Context, pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) { var claimed []*v1.Pod var errlist []error @@ -190,15 +191,15 @@ func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1. } return true } - adopt := func(obj metav1.Object) error { - return m.AdoptPod(obj.(*v1.Pod)) + adopt := func(ctx context.Context, obj metav1.Object) error { + return m.AdoptPod(ctx, obj.(*v1.Pod)) } release := func(obj metav1.Object) error { return m.ReleasePod(obj.(*v1.Pod)) } for _, pod := range pods { - ok, err := m.ClaimObject(pod, match, adopt, release) + ok, err := m.ClaimObject(ctx, pod, match, adopt, release) if err != nil { errlist = append(errlist, err) continue @@ -212,8 +213,8 @@ func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1. // AdoptPod sends a patch to take control of the pod. It returns the error if // the patching fails. -func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error { - if err := m.CanAdopt(); err != nil { +func (m *PodControllerRefManager) AdoptPod(ctx context.Context, pod *v1.Pod) error { + if err := m.CanAdopt(ctx); err != nil { return fmt.Errorf("can't adopt Pod %v/%v (%v): %v", pod.Namespace, pod.Name, pod.UID, err) } // Note that ValidateOwnerReferences() will reject this patch if another @@ -283,7 +284,7 @@ func NewReplicaSetControllerRefManager( controller metav1.Object, selector labels.Selector, controllerKind schema.GroupVersionKind, - canAdopt func() error, + canAdopt func(ctx context.Context) error, ) *ReplicaSetControllerRefManager { return &ReplicaSetControllerRefManager{ BaseControllerRefManager: BaseControllerRefManager{ @@ -309,22 +310,22 @@ func NewReplicaSetControllerRefManager( // If the error is nil, either the reconciliation succeeded, or no // reconciliation was necessary. The list of ReplicaSets that you now own is // returned. -func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*apps.ReplicaSet) ([]*apps.ReplicaSet, error) { +func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(ctx context.Context, sets []*apps.ReplicaSet) ([]*apps.ReplicaSet, error) { var claimed []*apps.ReplicaSet var errlist []error match := func(obj metav1.Object) bool { return m.Selector.Matches(labels.Set(obj.GetLabels())) } - adopt := func(obj metav1.Object) error { - return m.AdoptReplicaSet(obj.(*apps.ReplicaSet)) + adopt := func(ctx context.Context, obj metav1.Object) error { + return m.AdoptReplicaSet(ctx, obj.(*apps.ReplicaSet)) } release := func(obj metav1.Object) error { return m.ReleaseReplicaSet(obj.(*apps.ReplicaSet)) } for _, rs := range sets { - ok, err := m.ClaimObject(rs, match, adopt, release) + ok, err := m.ClaimObject(ctx, rs, match, adopt, release) if err != nil { errlist = append(errlist, err) continue @@ -338,8 +339,8 @@ func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(sets []*apps.ReplicaSe // AdoptReplicaSet sends a patch to take control of the ReplicaSet. It returns // the error if the patching fails. -func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(rs *apps.ReplicaSet) error { - if err := m.CanAdopt(); err != nil { +func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(ctx context.Context, rs *apps.ReplicaSet) error { + if err := m.CanAdopt(ctx); err != nil { return fmt.Errorf("can't adopt ReplicaSet %v/%v (%v): %v", rs.Namespace, rs.Name, rs.UID, err) } // Note that ValidateOwnerReferences() will reject this patch if another @@ -381,9 +382,9 @@ func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *apps.Repl // // The CanAdopt() function calls getObject() to fetch the latest value, // and denies adoption attempts if that object has a non-nil DeletionTimestamp. -func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error { - return func() error { - obj, err := getObject() +func RecheckDeletionTimestamp(getObject func(context.Context) (metav1.Object, error)) func(context.Context) error { + return func(ctx context.Context) error { + obj, err := getObject(ctx) if err != nil { return fmt.Errorf("can't recheck DeletionTimestamp: %v", err) } @@ -421,7 +422,7 @@ func NewControllerRevisionControllerRefManager( controller metav1.Object, selector labels.Selector, controllerKind schema.GroupVersionKind, - canAdopt func() error, + canAdopt func(ctx context.Context) error, ) *ControllerRevisionControllerRefManager { return &ControllerRevisionControllerRefManager{ BaseControllerRefManager: BaseControllerRefManager{ @@ -447,22 +448,22 @@ func NewControllerRevisionControllerRefManager( // If the error is nil, either the reconciliation succeeded, or no // reconciliation was necessary. The list of ControllerRevisions that you now own is // returned. -func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(histories []*apps.ControllerRevision) ([]*apps.ControllerRevision, error) { +func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(ctx context.Context, histories []*apps.ControllerRevision) ([]*apps.ControllerRevision, error) { var claimed []*apps.ControllerRevision var errlist []error match := func(obj metav1.Object) bool { return m.Selector.Matches(labels.Set(obj.GetLabels())) } - adopt := func(obj metav1.Object) error { - return m.AdoptControllerRevision(obj.(*apps.ControllerRevision)) + adopt := func(ctx context.Context, obj metav1.Object) error { + return m.AdoptControllerRevision(ctx, obj.(*apps.ControllerRevision)) } release := func(obj metav1.Object) error { return m.ReleaseControllerRevision(obj.(*apps.ControllerRevision)) } for _, h := range histories { - ok, err := m.ClaimObject(h, match, adopt, release) + ok, err := m.ClaimObject(ctx, h, match, adopt, release) if err != nil { errlist = append(errlist, err) continue @@ -476,8 +477,8 @@ func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(histor // AdoptControllerRevision sends a patch to take control of the ControllerRevision. It returns the error if // the patching fails. -func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(history *apps.ControllerRevision) error { - if err := m.CanAdopt(); err != nil { +func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(ctx context.Context, history *apps.ControllerRevision) error { + if err := m.CanAdopt(ctx); err != nil { return fmt.Errorf("can't adopt ControllerRevision %v/%v (%v): %v", history.Namespace, history.Name, history.UID, err) } // Note that ValidateOwnerReferences() will reject this patch if another diff --git a/pkg/controller/controller_ref_manager_test.go b/pkg/controller/controller_ref_manager_test.go index 9fa2fd9b3f127..e06ec35bc8fb8 100644 --- a/pkg/controller/controller_ref_manager_test.go +++ b/pkg/controller/controller_ref_manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "context" "strings" "testing" @@ -73,7 +74,7 @@ func TestClaimPods(t *testing.T) { &v1.ReplicationController{}, productionLabelSelector, controllerKind, - func() error { return nil }), + func(ctx context.Context) error { return nil }), pods: []*v1.Pod{newPod("pod1", productionLabel, nil), newPod("pod2", testLabel, nil)}, claimed: []*v1.Pod{newPod("pod1", productionLabel, nil)}, patches: 1, @@ -89,7 +90,7 @@ func TestClaimPods(t *testing.T) { &controller, productionLabelSelector, controllerKind, - func() error { return nil }), + func(ctx context.Context) error { return nil }), pods: []*v1.Pod{newPod("pod1", productionLabel, nil), newPod("pod2", productionLabel, nil)}, claimed: nil, } @@ -105,7 +106,7 @@ func TestClaimPods(t *testing.T) { &controller, productionLabelSelector, controllerKind, - func() error { return nil }), + func(ctx context.Context) error { return nil }), pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", productionLabel, nil)}, claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller)}, } @@ -121,7 +122,7 @@ func TestClaimPods(t *testing.T) { &controller, productionLabelSelector, controllerKind, - func() error { return nil }), + func(ctx context.Context) error { return nil }), pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", productionLabel, &controller2)}, claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller)}, } @@ -135,7 +136,7 @@ func TestClaimPods(t *testing.T) { &controller, productionLabelSelector, controllerKind, - func() error { return nil }), + func(ctx context.Context) error { return nil }), pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", testLabel, &controller)}, claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller)}, patches: 1, @@ -156,7 +157,7 @@ func TestClaimPods(t *testing.T) { &controller, productionLabelSelector, controllerKind, - func() error { return nil }), + func(ctx context.Context) error { return nil }), pods: []*v1.Pod{podToDelete1, podToDelete2}, claimed: []*v1.Pod{podToDelete1}, } @@ -170,7 +171,7 @@ func TestClaimPods(t *testing.T) { &controller, productionLabelSelector, controllerKind, - func() error { return nil }, + func(ctx context.Context) error { return nil }, "foo-finalizer", "bar-finalizer"), pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", testLabel, &controller), newPod("pod3", productionLabel, nil)}, claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod3", productionLabel, nil)}, @@ -180,7 +181,7 @@ func TestClaimPods(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - claimed, err := test.manager.ClaimPods(test.pods) + claimed, err := test.manager.ClaimPods(context.TODO(), test.pods) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index fbda4686eab2e..5760efe913104 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -94,7 +94,7 @@ type DaemonSetsController struct { burstReplicas int // To allow injection of syncDaemonSet for testing. - syncHandler func(dsKey string) error + syncHandler func(ctx context.Context, dsKey string) error // used for unit testing enqueueDaemonSet func(ds *apps.DaemonSet) // A TTLCache of pod creates/deletes each ds expects to see @@ -277,40 +277,40 @@ func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) { } // Run begins watching and syncing daemon sets. -func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { +func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer dsc.queue.ShutDown() klog.Infof("Starting daemon sets controller") defer klog.Infof("Shutting down daemon sets controller") - if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) { + if !cache.WaitForNamedCacheSync("daemon sets", ctx.Done(), dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(dsc.runWorker, time.Second, stopCh) + go wait.UntilWithContext(ctx, dsc.runWorker, time.Second) } - go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh) + go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, ctx.Done()) - <-stopCh + <-ctx.Done() } -func (dsc *DaemonSetsController) runWorker() { - for dsc.processNextWorkItem() { +func (dsc *DaemonSetsController) runWorker(ctx context.Context) { + for dsc.processNextWorkItem(ctx) { } } // processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. -func (dsc *DaemonSetsController) processNextWorkItem() bool { +func (dsc *DaemonSetsController) processNextWorkItem(ctx context.Context) bool { dsKey, quit := dsc.queue.Get() if quit { return false } defer dsc.queue.Done(dsKey) - err := dsc.syncHandler(dsKey.(string)) + err := dsc.syncHandler(ctx, dsKey.(string)) if err == nil { dsc.queue.Forget(dsKey) return true @@ -711,7 +711,7 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) { // This also reconciles ControllerRef by adopting/orphaning. // Note that returned Pods are pointers to objects in the cache. // If you want to modify one, you need to deep-copy it first. -func (dsc *DaemonSetsController) getDaemonPods(ds *apps.DaemonSet) ([]*v1.Pod, error) { +func (dsc *DaemonSetsController) getDaemonPods(ctx context.Context, ds *apps.DaemonSet) ([]*v1.Pod, error) { selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) if err != nil { return nil, err @@ -725,8 +725,8 @@ func (dsc *DaemonSetsController) getDaemonPods(ds *apps.DaemonSet) ([]*v1.Pod, e } // If any adoptions are attempted, we should first recheck for deletion with // an uncached quorum read sometime after listing Pods (see #42639). - dsNotDeleted := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { - fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{}) + dsNotDeleted := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) { + fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{}) if err != nil { return nil, err } @@ -738,15 +738,15 @@ func (dsc *DaemonSetsController) getDaemonPods(ds *apps.DaemonSet) ([]*v1.Pod, e // Use ControllerRefManager to adopt/orphan as needed. cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind, dsNotDeleted) - return cm.ClaimPods(pods) + return cm.ClaimPods(ctx, pods) } // getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) created for the nodes. // This also reconciles ControllerRef by adopting/orphaning. // Note that returned Pods are pointers to objects in the cache. // If you want to modify one, you need to deep-copy it first. -func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *apps.DaemonSet) (map[string][]*v1.Pod, error) { - claimedPods, err := dsc.getDaemonPods(ds) +func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *apps.DaemonSet) (map[string][]*v1.Pod, error) { + claimedPods, err := dsc.getDaemonPods(ctx, ds) if err != nil { return nil, err } @@ -910,9 +910,9 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( // After figuring out which nodes should run a Pod of ds but not yet running one and // which nodes should not run a Pod of ds but currently running one, it calls function // syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds. -func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error { +func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error { // Find out the pods which are created for the nodes by DaemonSet. - nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } @@ -1053,7 +1053,17 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod return utilerrors.NewAggregate(errors) } -func storeDaemonSetStatus(dsClient unversionedapps.DaemonSetInterface, ds *apps.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable int, updateObservedGen bool) error { +func storeDaemonSetStatus( + ctx context.Context, + dsClient unversionedapps.DaemonSetInterface, + ds *apps.DaemonSet, desiredNumberScheduled, + currentNumberScheduled, + numberMisscheduled, + numberReady, + updatedNumberScheduled, + numberAvailable, + numberUnavailable int, + updateObservedGen bool) error { if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled && int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled && int(ds.Status.NumberMisscheduled) == numberMisscheduled && @@ -1080,7 +1090,7 @@ func storeDaemonSetStatus(dsClient unversionedapps.DaemonSetInterface, ds *apps. toUpdate.Status.NumberAvailable = int32(numberAvailable) toUpdate.Status.NumberUnavailable = int32(numberUnavailable) - if _, updateErr = dsClient.UpdateStatus(context.TODO(), toUpdate, metav1.UpdateOptions{}); updateErr == nil { + if _, updateErr = dsClient.UpdateStatus(ctx, toUpdate, metav1.UpdateOptions{}); updateErr == nil { return nil } @@ -1089,7 +1099,7 @@ func storeDaemonSetStatus(dsClient unversionedapps.DaemonSetInterface, ds *apps. break } // Update the set with the latest resource version for the next poll - if toUpdate, getErr = dsClient.Get(context.TODO(), ds.Name, metav1.GetOptions{}); getErr != nil { + if toUpdate, getErr = dsClient.Get(ctx, ds.Name, metav1.GetOptions{}); getErr != nil { // If the GET fails we can't trust status.Replicas anymore. This error // is bound to be more interesting than the update failure. return getErr @@ -1098,9 +1108,9 @@ func storeDaemonSetStatus(dsClient unversionedapps.DaemonSetInterface, ds *apps. return updateErr } -func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error { +func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error { klog.V(4).Infof("Updating daemon set status") - nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } @@ -1143,7 +1153,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL } numberUnavailable := desiredNumberScheduled - numberAvailable - err = storeDaemonSetStatus(dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen) + err = storeDaemonSetStatus(ctx, dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen) if err != nil { return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err) } @@ -1155,7 +1165,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL return nil } -func (dsc *DaemonSetsController) syncDaemonSet(key string) error { +func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) error { startTime := dsc.failedPodsBackoff.Clock.Now() defer func() { @@ -1208,7 +1218,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { } // Construct histories of the DaemonSet, and get the hash of current history - cur, old, err := dsc.constructHistory(ds) + cur, old, err := dsc.constructHistory(ctx, ds) if err != nil { return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err) } @@ -1216,10 +1226,10 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { if !dsc.expectations.SatisfiedExpectations(dsKey) { // Only update status. Don't raise observedGeneration since controller didn't process object of that generation. - return dsc.updateDaemonSetStatus(ds, nodeList, hash, false) + return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false) } - err = dsc.manage(ds, nodeList, hash) + err = dsc.manage(ctx, ds, nodeList, hash) if err != nil { return err } @@ -1229,19 +1239,19 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { switch ds.Spec.UpdateStrategy.Type { case apps.OnDeleteDaemonSetStrategyType: case apps.RollingUpdateDaemonSetStrategyType: - err = dsc.rollingUpdate(ds, nodeList, hash) + err = dsc.rollingUpdate(ctx, ds, nodeList, hash) } if err != nil { return err } } - err = dsc.cleanupHistory(ds, old) + err = dsc.cleanupHistory(ctx, ds, old) if err != nil { return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err) } - return dsc.updateDaemonSetStatus(ds, nodeList, hash, true) + return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true) } // nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index 2d18178a70cf5..bafc0cedd0ddc 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -383,7 +383,7 @@ func expectSyncDaemonSets(t *testing.T, manager *daemonSetsController, ds *apps. t.Fatal("could not get key for daemon") } - err = manager.syncHandler(key) + err = manager.syncHandler(context.TODO(), key) if err != nil { t.Log(err) } @@ -547,7 +547,7 @@ func TestExpectationsOnRecreate(t *testing.T) { // create of DS adds to queue, processes waitForQueueLength(1, "created DS") - ok := dsc.processNextWorkItem() + ok := dsc.processNextWorkItem(context.TODO()) if !ok { t.Fatal("queue is shutting down") } @@ -576,7 +576,7 @@ func TestExpectationsOnRecreate(t *testing.T) { // process updates DS, update adds to queue waitForQueueLength(1, "updated DS") - ok = dsc.processNextWorkItem() + ok = dsc.processNextWorkItem(context.TODO()) if !ok { t.Fatal("queue is shutting down") } @@ -624,7 +624,7 @@ func TestExpectationsOnRecreate(t *testing.T) { } waitForQueueLength(1, "recreated DS") - ok = dsc.processNextWorkItem() + ok = dsc.processNextWorkItem(context.TODO()) if !ok { t.Fatal("Queue is shutting down!") } @@ -2797,7 +2797,7 @@ func TestGetNodesToDaemonPods(t *testing.T) { } } - nodesToDaemonPods, err := manager.getNodesToDaemonPods(ds) + nodesToDaemonPods, err := manager.getNodesToDaemonPods(context.TODO(), ds) if err != nil { t.Fatalf("getNodesToDaemonPods() error: %v", err) } @@ -3552,7 +3552,7 @@ func TestStoreDaemonSetStatus(t *testing.T) { } return true, ds, nil }) - if err := storeDaemonSetStatus(fakeClient.AppsV1().DaemonSets("default"), ds, 2, 2, 2, 2, 2, 2, 2, true); err != tt.expectedError { + if err := storeDaemonSetStatus(context.TODO(), fakeClient.AppsV1().DaemonSets("default"), ds, 2, 2, 2, 2, 2, 2, 2, true); err != tt.expectedError { t.Errorf("storeDaemonSetStatus() got %v, expected %v", err, tt.expectedError) } if getCalled != tt.expectedGetCalled { diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index cd2ee0c72f1f1..e1a13f9c4d6da 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -40,8 +40,8 @@ import ( // rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes, // remaining within the constraints imposed by the update strategy. -func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error { - nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) +func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error { + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } @@ -234,10 +234,10 @@ func findUpdatedPodsOnNode(ds *apps.DaemonSet, podsOnNode []*v1.Pod, hash string // constructHistory finds all histories controlled by the given DaemonSet, and // update current history revision number, or create current history if need to. // It also deduplicates current history, and adds missing unique labels to existing histories. -func (dsc *DaemonSetsController) constructHistory(ds *apps.DaemonSet) (cur *apps.ControllerRevision, old []*apps.ControllerRevision, err error) { +func (dsc *DaemonSetsController) constructHistory(ctx context.Context, ds *apps.DaemonSet) (cur *apps.ControllerRevision, old []*apps.ControllerRevision, err error) { var histories []*apps.ControllerRevision var currentHistories []*apps.ControllerRevision - histories, err = dsc.controlledHistories(ds) + histories, err = dsc.controlledHistories(ctx, ds) if err != nil { return nil, nil, err } @@ -247,7 +247,7 @@ func (dsc *DaemonSetsController) constructHistory(ds *apps.DaemonSet) (cur *apps if _, ok := history.Labels[apps.DefaultDaemonSetUniqueLabelKey]; !ok { toUpdate := history.DeepCopy() toUpdate.Labels[apps.DefaultDaemonSetUniqueLabelKey] = toUpdate.Name - history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(context.TODO(), toUpdate, metav1.UpdateOptions{}) + history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(ctx, toUpdate, metav1.UpdateOptions{}) if err != nil { return nil, nil, err } @@ -269,12 +269,12 @@ func (dsc *DaemonSetsController) constructHistory(ds *apps.DaemonSet) (cur *apps switch len(currentHistories) { case 0: // Create a new history if the current one isn't found - cur, err = dsc.snapshot(ds, currRevision) + cur, err = dsc.snapshot(ctx, ds, currRevision) if err != nil { return nil, nil, err } default: - cur, err = dsc.dedupCurHistories(ds, currentHistories) + cur, err = dsc.dedupCurHistories(ctx, ds, currentHistories) if err != nil { return nil, nil, err } @@ -282,7 +282,7 @@ func (dsc *DaemonSetsController) constructHistory(ds *apps.DaemonSet) (cur *apps if cur.Revision < currRevision { toUpdate := cur.DeepCopy() toUpdate.Revision = currRevision - _, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(context.TODO(), toUpdate, metav1.UpdateOptions{}) + _, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Update(ctx, toUpdate, metav1.UpdateOptions{}) if err != nil { return nil, nil, err } @@ -291,8 +291,8 @@ func (dsc *DaemonSetsController) constructHistory(ds *apps.DaemonSet) (cur *apps return cur, old, err } -func (dsc *DaemonSetsController) cleanupHistory(ds *apps.DaemonSet, old []*apps.ControllerRevision) error { - nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ds) +func (dsc *DaemonSetsController) cleanupHistory(ctx context.Context, ds *apps.DaemonSet, old []*apps.ControllerRevision) error { + nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } @@ -323,7 +323,7 @@ func (dsc *DaemonSetsController) cleanupHistory(ds *apps.DaemonSet, old []*apps. continue } // Clean up - err := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(context.TODO(), history.Name, metav1.DeleteOptions{}) + err := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(ctx, history.Name, metav1.DeleteOptions{}) if err != nil { return err } @@ -343,7 +343,7 @@ func maxRevision(histories []*apps.ControllerRevision) int64 { return max } -func (dsc *DaemonSetsController) dedupCurHistories(ds *apps.DaemonSet, curHistories []*apps.ControllerRevision) (*apps.ControllerRevision, error) { +func (dsc *DaemonSetsController) dedupCurHistories(ctx context.Context, ds *apps.DaemonSet, curHistories []*apps.ControllerRevision) (*apps.ControllerRevision, error) { if len(curHistories) == 1 { return curHistories[0], nil } @@ -361,7 +361,7 @@ func (dsc *DaemonSetsController) dedupCurHistories(ds *apps.DaemonSet, curHistor continue } // Relabel pods before dedup - pods, err := dsc.getDaemonPods(ds) + pods, err := dsc.getDaemonPods(ctx, ds) if err != nil { return nil, err } @@ -372,14 +372,14 @@ func (dsc *DaemonSetsController) dedupCurHistories(ds *apps.DaemonSet, curHistor toUpdate.Labels = make(map[string]string) } toUpdate.Labels[apps.DefaultDaemonSetUniqueLabelKey] = keepCur.Labels[apps.DefaultDaemonSetUniqueLabelKey] - _, err = dsc.kubeClient.CoreV1().Pods(ds.Namespace).Update(context.TODO(), toUpdate, metav1.UpdateOptions{}) + _, err = dsc.kubeClient.CoreV1().Pods(ds.Namespace).Update(ctx, toUpdate, metav1.UpdateOptions{}) if err != nil { return nil, err } } } // Remove duplicates - err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(context.TODO(), cur.Name, metav1.DeleteOptions{}) + err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Delete(ctx, cur.Name, metav1.DeleteOptions{}) if err != nil { return nil, err } @@ -391,7 +391,7 @@ func (dsc *DaemonSetsController) dedupCurHistories(ds *apps.DaemonSet, curHistor // This also reconciles ControllerRef by adopting/orphaning. // Note that returned histories are pointers to objects in the cache. // If you want to modify one, you need to deep-copy it first. -func (dsc *DaemonSetsController) controlledHistories(ds *apps.DaemonSet) ([]*apps.ControllerRevision, error) { +func (dsc *DaemonSetsController) controlledHistories(ctx context.Context, ds *apps.DaemonSet) ([]*apps.ControllerRevision, error) { selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector) if err != nil { return nil, err @@ -405,8 +405,8 @@ func (dsc *DaemonSetsController) controlledHistories(ds *apps.DaemonSet) ([]*app } // If any adoptions are attempted, we should first recheck for deletion with // an uncached quorum read sometime after listing Pods (see #42639). - canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { - fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{}) + canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) { + fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{}) if err != nil { return nil, err } @@ -417,7 +417,7 @@ func (dsc *DaemonSetsController) controlledHistories(ds *apps.DaemonSet) ([]*app }) // Use ControllerRefManager to adopt/orphan as needed. cm := controller.NewControllerRevisionControllerRefManager(dsc.crControl, ds, selector, controllerKind, canAdoptFunc) - return cm.ClaimControllerRevisions(histories) + return cm.ClaimControllerRevisions(ctx, histories) } // Match check if the given DaemonSet's template matches the template stored in the given history. @@ -456,7 +456,7 @@ func getPatch(ds *apps.DaemonSet) ([]byte, error) { return patch, err } -func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (*apps.ControllerRevision, error) { +func (dsc *DaemonSetsController) snapshot(ctx context.Context, ds *apps.DaemonSet, revision int64) (*apps.ControllerRevision, error) { patch, err := getPatch(ds) if err != nil { return nil, err @@ -475,10 +475,10 @@ func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (* Revision: revision, } - history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Create(context.TODO(), history, metav1.CreateOptions{}) + history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Create(ctx, history, metav1.CreateOptions{}) if outerErr := err; errors.IsAlreadyExists(outerErr) { // TODO: Is it okay to get from historyLister? - existedHistory, getErr := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Get(context.TODO(), name, metav1.GetOptions{}) + existedHistory, getErr := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Get(ctx, name, metav1.GetOptions{}) if getErr != nil { return nil, getErr } @@ -493,7 +493,7 @@ func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (* // Handle name collisions between different history // Get the latest DaemonSet from the API server to make sure collision count is only increased when necessary - currDS, getErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(context.TODO(), ds.Name, metav1.GetOptions{}) + currDS, getErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{}) if getErr != nil { return nil, getErr } @@ -505,7 +505,7 @@ func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (* currDS.Status.CollisionCount = new(int32) } *currDS.Status.CollisionCount++ - _, updateErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).UpdateStatus(context.TODO(), currDS, metav1.UpdateOptions{}) + _, updateErr := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).UpdateStatus(ctx, currDS, metav1.UpdateOptions{}) if updateErr != nil { return nil, updateErr } diff --git a/pkg/controller/daemon/update_test.go b/pkg/controller/daemon/update_test.go index b115af5ec98d2..57a1baaf9fc05 100644 --- a/pkg/controller/daemon/update_test.go +++ b/pkg/controller/daemon/update_test.go @@ -17,6 +17,7 @@ limitations under the License. package daemon import ( + "context" "testing" "time" @@ -323,7 +324,7 @@ func setPodReadiness(t *testing.T, dsc *daemonSetsController, ready bool, count func currentDSHash(dsc *daemonSetsController, ds *apps.DaemonSet) (string, error) { // Construct histories of the DaemonSet, and get the hash of current history - cur, _, err := dsc.constructHistory(ds) + cur, _, err := dsc.constructHistory(context.TODO(), ds) if err != nil { return "", err } diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 4c92741390cab..ee4e3bdc7da99 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -72,7 +72,7 @@ type DeploymentController struct { eventRecorder record.EventRecorder // To allow injection of syncDeployment for testing. - syncHandler func(dKey string) error + syncHandler func(ctx context.Context, dKey string) error // used for unit testing enqueueDeployment func(deployment *apps.Deployment) @@ -146,22 +146,22 @@ func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInfor } // Run begins watching and syncing. -func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { +func (dc *DeploymentController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer dc.queue.ShutDown() klog.InfoS("Starting controller", "controller", "deployment") defer klog.InfoS("Shutting down controller", "controller", "deployment") - if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { + if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(dc.worker, time.Second, stopCh) + go wait.UntilWithContext(ctx, dc.worker, time.Second) } - <-stopCh + <-ctx.Done() } func (dc *DeploymentController) addDeployment(obj interface{}) { @@ -457,19 +457,19 @@ func (dc *DeploymentController) resolveControllerRef(namespace string, controlle // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. -func (dc *DeploymentController) worker() { - for dc.processNextWorkItem() { +func (dc *DeploymentController) worker(ctx context.Context) { + for dc.processNextWorkItem(ctx) { } } -func (dc *DeploymentController) processNextWorkItem() bool { +func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool { key, quit := dc.queue.Get() if quit { return false } defer dc.queue.Done(key) - err := dc.syncHandler(key.(string)) + err := dc.syncHandler(ctx, key.(string)) dc.handleErr(err, key) return true @@ -500,7 +500,7 @@ func (dc *DeploymentController) handleErr(err error, key interface{}) { // getReplicaSetsForDeployment uses ControllerRefManager to reconcile // ControllerRef by adopting and orphaning. // It returns the list of ReplicaSets that this Deployment should manage. -func (dc *DeploymentController) getReplicaSetsForDeployment(d *apps.Deployment) ([]*apps.ReplicaSet, error) { +func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, d *apps.Deployment) ([]*apps.ReplicaSet, error) { // List all ReplicaSets to find those we own but that no longer match our // selector. They will be orphaned by ClaimReplicaSets(). rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything()) @@ -513,8 +513,8 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(d *apps.Deployment) } // If any adoptions are attempted, we should first recheck for deletion with // an uncached quorum read sometime after listing ReplicaSets (see #42639). - canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { - fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(context.TODO(), d.Name, metav1.GetOptions{}) + canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) { + fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(ctx, d.Name, metav1.GetOptions{}) if err != nil { return nil, err } @@ -524,7 +524,7 @@ func (dc *DeploymentController) getReplicaSetsForDeployment(d *apps.Deployment) return fresh, nil }) cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc) - return cm.ClaimReplicaSets(rsList) + return cm.ClaimReplicaSets(ctx, rsList) } // getPodMapForDeployment returns the Pods managed by a Deployment. @@ -565,7 +565,7 @@ func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsLis // syncDeployment will sync the deployment with the given key. // This function is not meant to be invoked concurrently with the same key. -func (dc *DeploymentController) syncDeployment(key string) error { +func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key) @@ -596,14 +596,14 @@ func (dc *DeploymentController) syncDeployment(key string) error { dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.") if d.Status.ObservedGeneration < d.Generation { d.Status.ObservedGeneration = d.Generation - dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) + dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) } return nil } // List ReplicaSets owned by this Deployment, while reconciling ControllerRef // through adoption/orphaning. - rsList, err := dc.getReplicaSetsForDeployment(d) + rsList, err := dc.getReplicaSetsForDeployment(ctx, d) if err != nil { return err } @@ -618,40 +618,40 @@ func (dc *DeploymentController) syncDeployment(key string) error { } if d.DeletionTimestamp != nil { - return dc.syncStatusOnly(d, rsList) + return dc.syncStatusOnly(ctx, d, rsList) } // Update deployment conditions with an Unknown condition when pausing/resuming // a deployment. In this way, we can be sure that we won't timeout when a user // resumes a Deployment with a set progressDeadlineSeconds. - if err = dc.checkPausedConditions(d); err != nil { + if err = dc.checkPausedConditions(ctx, d); err != nil { return err } if d.Spec.Paused { - return dc.sync(d, rsList) + return dc.sync(ctx, d, rsList) } // rollback is not re-entrant in case the underlying replica sets are updated with a new // revision so we should ensure that we won't proceed to update replica sets until we // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues. if getRollbackTo(d) != nil { - return dc.rollback(d, rsList) + return dc.rollback(ctx, d, rsList) } - scalingEvent, err := dc.isScalingEvent(d, rsList) + scalingEvent, err := dc.isScalingEvent(ctx, d, rsList) if err != nil { return err } if scalingEvent { - return dc.sync(d, rsList) + return dc.sync(ctx, d, rsList) } switch d.Spec.Strategy.Type { case apps.RecreateDeploymentStrategyType: - return dc.rolloutRecreate(d, rsList, podMap) + return dc.rolloutRecreate(ctx, d, rsList, podMap) case apps.RollingUpdateDeploymentStrategyType: - return dc.rolloutRolling(d, rsList) + return dc.rolloutRolling(ctx, d, rsList) } return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 65eff29b78b48..4178714f27f45 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package deployment import ( + "context" "fmt" "strconv" "testing" @@ -221,7 +222,7 @@ func (f *fixture) run_(deploymentName string, startInformers bool, expectError b informers.Start(stopCh) } - err = c.syncDeployment(deploymentName) + err = c.syncDeployment(context.TODO(), deploymentName) if !expectError && err != nil { f.t.Errorf("error syncing deployment: %v", err) } else if expectError && err == nil { @@ -529,7 +530,7 @@ func TestGetReplicaSetsForDeployment(t *testing.T) { defer close(stopCh) informers.Start(stopCh) - rsList, err := c.getReplicaSetsForDeployment(d1) + rsList, err := c.getReplicaSetsForDeployment(context.TODO(), d1) if err != nil { t.Fatalf("getReplicaSetsForDeployment() error: %v", err) } @@ -541,7 +542,7 @@ func TestGetReplicaSetsForDeployment(t *testing.T) { t.Errorf("getReplicaSetsForDeployment() = %v, want [%v]", rsNames, rs1.Name) } - rsList, err = c.getReplicaSetsForDeployment(d2) + rsList, err = c.getReplicaSetsForDeployment(context.TODO(), d2) if err != nil { t.Fatalf("getReplicaSetsForDeployment() error: %v", err) } @@ -579,7 +580,7 @@ func TestGetReplicaSetsForDeploymentAdoptRelease(t *testing.T) { defer close(stopCh) informers.Start(stopCh) - rsList, err := c.getReplicaSetsForDeployment(d) + rsList, err := c.getReplicaSetsForDeployment(context.TODO(), d) if err != nil { t.Fatalf("getReplicaSetsForDeployment() error: %v", err) } diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go index 6812daa3c1c86..65cbb717a9d32 100644 --- a/pkg/controller/deployment/progress.go +++ b/pkg/controller/deployment/progress.go @@ -34,7 +34,7 @@ import ( // cases this helper will run that cannot be prevented from the scaling detection, // for example a resync of the deployment after it was scaled up. In those cases, // we shouldn't try to estimate any progress. -func (dc *DeploymentController) syncRolloutStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error { +func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error { newStatus := calculateStatus(allRSs, newRS, d) // If there is no progressDeadlineSeconds set, remove any Progressing condition. @@ -114,7 +114,7 @@ func (dc *DeploymentController) syncRolloutStatus(allRSs []*apps.ReplicaSet, new newDeployment := d newDeployment.Status = newStatus - _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{}) + _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{}) return err } diff --git a/pkg/controller/deployment/progress_test.go b/pkg/controller/deployment/progress_test.go index 05679c6bc9d4b..681a079de6d52 100644 --- a/pkg/controller/deployment/progress_test.go +++ b/pkg/controller/deployment/progress_test.go @@ -17,6 +17,7 @@ limitations under the License. package deployment import ( + "context" "math" "testing" "time" @@ -330,7 +331,7 @@ func TestSyncRolloutStatus(t *testing.T) { test.allRSs = append(test.allRSs, test.newRS) } - err := dc.syncRolloutStatus(test.allRSs, test.newRS, test.d) + err := dc.syncRolloutStatus(context.TODO(), test.allRSs, test.newRS, test.d) if err != nil { t.Error(err) } diff --git a/pkg/controller/deployment/recreate.go b/pkg/controller/deployment/recreate.go index 071f5344fd090..6da2704a466b0 100644 --- a/pkg/controller/deployment/recreate.go +++ b/pkg/controller/deployment/recreate.go @@ -17,6 +17,7 @@ limitations under the License. package deployment import ( + "context" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -25,9 +26,9 @@ import ( ) // rolloutRecreate implements the logic for recreating a replica set. -func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error { +func (dc *DeploymentController) rolloutRecreate(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error { // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down. - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false) + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false) if err != nil { return err } @@ -35,23 +36,23 @@ func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*ap activeOldRSs := controller.FilterActiveReplicaSets(oldRSs) // scale down old replica sets. - scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d) + scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(ctx, activeOldRSs, d) if err != nil { return err } if scaledDown { // Update DeploymentStatus. - return dc.syncRolloutStatus(allRSs, newRS, d) + return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } // Do not process a deployment when it has old pods running. if oldPodsRunning(newRS, oldRSs, podMap) { - return dc.syncRolloutStatus(allRSs, newRS, d) + return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } // If we need to create a new RS, create it now. if newRS == nil { - newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true) + newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true) if err != nil { return err } @@ -59,22 +60,22 @@ func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*ap } // scale up new replica set. - if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil { + if _, err := dc.scaleUpNewReplicaSetForRecreate(ctx, newRS, d); err != nil { return err } if util.DeploymentComplete(d, &d.Status) { - if err := dc.cleanupDeployment(oldRSs, d); err != nil { + if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } } // Sync deployment status. - return dc.syncRolloutStatus(allRSs, newRS, d) + return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } // scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate". -func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (bool, error) { +func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (bool, error) { scaled := false for i := range oldRSs { rs := oldRSs[i] @@ -82,7 +83,7 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*app if *(rs.Spec.Replicas) == 0 { continue } - scaledRS, updatedRS, err := dc.scaleReplicaSetAndRecordEvent(rs, 0, deployment) + scaledRS, updatedRS, err := dc.scaleReplicaSetAndRecordEvent(ctx, rs, 0, deployment) if err != nil { return false, err } @@ -125,7 +126,7 @@ func oldPodsRunning(newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet, podMap ma } // scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate". -func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) { - scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment) +func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(ctx context.Context, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) { + scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment) return scaled, err } diff --git a/pkg/controller/deployment/recreate_test.go b/pkg/controller/deployment/recreate_test.go index 04ce1507119b7..315c683d6f061 100644 --- a/pkg/controller/deployment/recreate_test.go +++ b/pkg/controller/deployment/recreate_test.go @@ -17,6 +17,7 @@ limitations under the License. package deployment import ( + "context" "fmt" "testing" @@ -71,7 +72,7 @@ func TestScaleDownOldReplicaSets(t *testing.T) { } c.eventRecorder = &record.FakeRecorder{} - c.scaleDownOldReplicaSetsForRecreate(oldRSs, test.d) + c.scaleDownOldReplicaSetsForRecreate(context.TODO(), oldRSs, test.d) for j := range oldRSs { rs := oldRSs[j] diff --git a/pkg/controller/deployment/rollback.go b/pkg/controller/deployment/rollback.go index 95708d923adc5..d27c0fc019dde 100644 --- a/pkg/controller/deployment/rollback.go +++ b/pkg/controller/deployment/rollback.go @@ -31,8 +31,8 @@ import ( ) // rollback the deployment to the specified revision. In any case cleanup the rollback spec. -func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.ReplicaSet) error { - newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true) +func (dc *DeploymentController) rollback(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error { + newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true) if err != nil { return err } @@ -45,7 +45,7 @@ func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.Repl // If we still can't find the last revision, gives up rollback dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.") // Gives up rollback - return dc.updateDeploymentAndClearRollbackTo(d) + return dc.updateDeploymentAndClearRollbackTo(ctx, d) } } for _, rs := range allRSs { @@ -59,7 +59,7 @@ func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.Repl // rollback by copying podTemplate.Spec from the replica set // revision number will be incremented during the next getAllReplicaSetsAndSyncRevision call // no-op if the spec matches current deployment's podTemplate.Spec - performedRollback, err := dc.rollbackToTemplate(d, rs) + performedRollback, err := dc.rollbackToTemplate(ctx, d, rs) if performedRollback && err == nil { dc.emitRollbackNormalEvent(d, fmt.Sprintf("Rolled back deployment %q to revision %d", d.Name, rollbackTo.Revision)) } @@ -68,13 +68,13 @@ func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.Repl } dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.") // Gives up rollback - return dc.updateDeploymentAndClearRollbackTo(d) + return dc.updateDeploymentAndClearRollbackTo(ctx, d) } // rollbackToTemplate compares the templates of the provided deployment and replica set and // updates the deployment with the replica set template in case they are different. It also // cleans up the rollback spec so subsequent requeues of the deployment won't end up in here. -func (dc *DeploymentController) rollbackToTemplate(d *apps.Deployment, rs *apps.ReplicaSet) (bool, error) { +func (dc *DeploymentController) rollbackToTemplate(ctx context.Context, d *apps.Deployment, rs *apps.ReplicaSet) (bool, error) { performedRollback := false if !deploymentutil.EqualIgnoreHash(&d.Spec.Template, &rs.Spec.Template) { klog.V(4).Infof("Rolling back deployment %q to template spec %+v", d.Name, rs.Spec.Template.Spec) @@ -98,7 +98,7 @@ func (dc *DeploymentController) rollbackToTemplate(d *apps.Deployment, rs *apps. dc.emitRollbackWarningEvent(d, deploymentutil.RollbackTemplateUnchanged, eventMsg) } - return performedRollback, dc.updateDeploymentAndClearRollbackTo(d) + return performedRollback, dc.updateDeploymentAndClearRollbackTo(ctx, d) } func (dc *DeploymentController) emitRollbackWarningEvent(d *apps.Deployment, reason, message string) { @@ -112,10 +112,10 @@ func (dc *DeploymentController) emitRollbackNormalEvent(d *apps.Deployment, mess // updateDeploymentAndClearRollbackTo sets .spec.rollbackTo to nil and update the input deployment // It is assumed that the caller will have updated the deployment template appropriately (in case // we want to rollback). -func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(d *apps.Deployment) error { +func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(ctx context.Context, d *apps.Deployment) error { klog.V(4).Infof("Cleans up rollbackTo of deployment %q", d.Name) setRollbackTo(d, nil) - _, err := dc.client.AppsV1().Deployments(d.Namespace).Update(context.TODO(), d, metav1.UpdateOptions{}) + _, err := dc.client.AppsV1().Deployments(d.Namespace).Update(ctx, d, metav1.UpdateOptions{}) return err } diff --git a/pkg/controller/deployment/rolling.go b/pkg/controller/deployment/rolling.go index e3408bd3fa863..5e75046744a34 100644 --- a/pkg/controller/deployment/rolling.go +++ b/pkg/controller/deployment/rolling.go @@ -17,6 +17,7 @@ limitations under the License. package deployment import ( + "context" "fmt" "sort" @@ -28,62 +29,62 @@ import ( ) // rolloutRolling implements the logic for rolling a new replica set. -func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error { - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true) +func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error { + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true) if err != nil { return err } allRSs := append(oldRSs, newRS) // Scale up, if we can. - scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d) + scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d) if err != nil { return err } if scaledUp { // Update DeploymentStatus - return dc.syncRolloutStatus(allRSs, newRS, d) + return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } // Scale down, if we can. - scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d) + scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d) if err != nil { return err } if scaledDown { // Update DeploymentStatus - return dc.syncRolloutStatus(allRSs, newRS, d) + return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } if deploymentutil.DeploymentComplete(d, &d.Status) { - if err := dc.cleanupDeployment(oldRSs, d); err != nil { + if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } } // Sync deployment status - return dc.syncRolloutStatus(allRSs, newRS, d) + return dc.syncRolloutStatus(ctx, allRSs, newRS, d) } -func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) { +func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) { if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) { // Scaling not required. return false, nil } if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) { // Scale down. - scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment) + scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment) return scaled, err } newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS) if err != nil { return false, err } - scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment) + scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, newReplicasCount, deployment) return scaled, err } -func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) { +func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) { oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs) if oldPodsCount == 0 { // Can't scale down further @@ -133,7 +134,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSe // Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment // and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737 - oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, maxScaledDown) + oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(ctx, oldRSs, deployment, maxScaledDown) if err != nil { return false, nil } @@ -141,7 +142,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSe // Scale down old replica sets, need check maxUnavailable to ensure we can scale down allRSs = append(oldRSs, newRS) - scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment) + scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(ctx, allRSs, oldRSs, deployment) if err != nil { return false, nil } @@ -152,7 +153,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*apps.ReplicaSe } // cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted. -func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*apps.ReplicaSet, deployment *apps.Deployment, maxCleanupCount int32) ([]*apps.ReplicaSet, int32, error) { +func (dc *DeploymentController) cleanupUnhealthyReplicas(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment, maxCleanupCount int32) ([]*apps.ReplicaSet, int32, error) { sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) // Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order // such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will @@ -177,7 +178,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*apps.ReplicaS if newReplicasCount > *(targetRS.Spec.Replicas) { return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount) } - _, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) + _, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment) if err != nil { return nil, totalScaledDown, err } @@ -189,7 +190,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*apps.ReplicaS // scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate". // Need check maxUnavailable to ensure availability -func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) { +func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) { maxUnavailable := deploymentutil.MaxUnavailable(*deployment) // Check if we can scale down. @@ -221,7 +222,7 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs [ if newReplicasCount > *(targetRS.Spec.Replicas) { return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount) } - _, _, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) + _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment) if err != nil { return totalScaledDown, err } diff --git a/pkg/controller/deployment/rolling_test.go b/pkg/controller/deployment/rolling_test.go index b0feb42b40b85..1cd93050aaf1e 100644 --- a/pkg/controller/deployment/rolling_test.go +++ b/pkg/controller/deployment/rolling_test.go @@ -17,6 +17,7 @@ limitations under the License. package deployment import ( + "context" "testing" apps "k8s.io/api/apps/v1" @@ -90,7 +91,7 @@ func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) { client: &fake, eventRecorder: &record.FakeRecorder{}, } - scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, deployment) + scaled, err := controller.reconcileNewReplicaSet(context.TODO(), allRSs, newRS, deployment) if err != nil { t.Errorf("unexpected error: %v", err) continue @@ -197,7 +198,7 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) { eventRecorder: &record.FakeRecorder{}, } - scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, deployment) + scaled, err := controller.reconcileOldReplicaSets(context.TODO(), allRSs, oldRSs, newRS, deployment) if err != nil { t.Errorf("unexpected error: %v", err) continue @@ -265,7 +266,7 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) { client: &fakeClientset, eventRecorder: &record.FakeRecorder{}, } - _, cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, deployment, int32(test.maxCleanupCount)) + _, cleanupCount, err := controller.cleanupUnhealthyReplicas(context.TODO(), oldRSs, deployment, int32(test.maxCleanupCount)) if err != nil { t.Errorf("unexpected error: %v", err) continue @@ -339,7 +340,7 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing client: &fakeClientset, eventRecorder: &record.FakeRecorder{}, } - scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment) + scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(context.TODO(), allRSs, oldRSs, deployment) if err != nil { t.Errorf("unexpected error: %v", err) continue diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index c1e3d18c05b9e..b5c67a9e1bc0d 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -34,24 +34,24 @@ import ( ) // syncStatusOnly only updates Deployments Status and doesn't take any mutating actions. -func (dc *DeploymentController) syncStatusOnly(d *apps.Deployment, rsList []*apps.ReplicaSet) error { - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false) +func (dc *DeploymentController) syncStatusOnly(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error { + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false) if err != nil { return err } allRSs := append(oldRSs, newRS) - return dc.syncDeploymentStatus(allRSs, newRS, d) + return dc.syncDeploymentStatus(ctx, allRSs, newRS, d) } // sync is responsible for reconciling deployments on scaling events or when they // are paused. -func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error { - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false) +func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error { + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false) if err != nil { return err } - if err := dc.scale(d, newRS, oldRSs); err != nil { + if err := dc.scale(ctx, d, newRS, oldRSs); err != nil { // If we get an error while trying to scale, the deployment will be requeued // so we can abort this resync return err @@ -59,19 +59,19 @@ func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaS // Clean up the deployment when it's paused and no rollback is in flight. if d.Spec.Paused && getRollbackTo(d) == nil { - if err := dc.cleanupDeployment(oldRSs, d); err != nil { + if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil { return err } } allRSs := append(oldRSs, newRS) - return dc.syncDeploymentStatus(allRSs, newRS, d) + return dc.syncDeploymentStatus(ctx, allRSs, newRS, d) } // checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition. // These conditions are needed so that we won't accidentally report lack of progress for resumed deployments // that were paused for longer than progressDeadlineSeconds. -func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error { +func (dc *DeploymentController) checkPausedConditions(ctx context.Context, d *apps.Deployment) error { if !deploymentutil.HasProgressDeadline(d) { return nil } @@ -98,7 +98,7 @@ func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error } var err error - _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) + _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) return err } @@ -113,11 +113,11 @@ func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error // // Note that currently the deployment controller is using caches to avoid querying the server for reads. // This may lead to stale reads of replica sets, thus incorrect deployment status. -func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(d *apps.Deployment, rsList []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, []*apps.ReplicaSet, error) { +func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, []*apps.ReplicaSet, error) { _, allOldRSs := deploymentutil.FindOldReplicaSets(d, rsList) // Get new replica set with the updated revision number - newRS, err := dc.getNewReplicaSet(d, rsList, allOldRSs, createIfNotExisted) + newRS, err := dc.getNewReplicaSet(ctx, d, rsList, allOldRSs, createIfNotExisted) if err != nil { return nil, nil, err } @@ -135,7 +135,7 @@ const ( // 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes. // 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas. // Note that the pod-template-hash will be added to adopted RSes and pods. -func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) { +func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) { existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList) // Calculate the max revision number among all old RSes @@ -155,7 +155,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds if annotationsUpdated || minReadySecondsNeedsUpdate { rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds - return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{}) + return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) } // Should use the revision in existingNewRS's annotation, since it set by before @@ -173,7 +173,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old if needsUpdate { var err error - if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}); err != nil { + if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil { return nil, err } } @@ -220,7 +220,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old // hash collisions. If there is any other error, we need to report it in the status of // the Deployment. alreadyExists := false - createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{}) + createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{}) switch { // We may end up hitting this due to a slow cache or a fast resync of the Deployment. case errors.IsAlreadyExists(err): @@ -252,7 +252,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old *d.Status.CollisionCount++ // Update the collisionCount for the Deployment and let it requeue by returning the original // error. - _, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) + _, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) if dErr == nil { klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount) } @@ -268,7 +268,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old // We don't really care about this error at this point, since we have a bigger issue to report. // TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account // these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568 - _, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) + _, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) } dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg) return nil, err @@ -285,7 +285,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old needsUpdate = true } if needsUpdate { - _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) + _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) } return createdRS, err } @@ -295,14 +295,14 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old // have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable // replicas in the event of a problem with the rolled out template. Should run only on scaling events or // when a deployment is paused and not during the normal rollout process. -func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error { +func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error { // If there is only one active replica set then we should scale that up to the full count of the // deployment. If there is no active replica set, then we should scale up the newest replica set. if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil { if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) { return nil } - _, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, *(deployment.Spec.Replicas), deployment) + _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, activeOrLatest, *(deployment.Spec.Replicas), deployment) return err } @@ -310,7 +310,7 @@ func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.R // This case handles replica set adoption during a saturated new replica set. if deploymentutil.IsSaturated(deployment, newRS) { for _, old := range controller.FilterActiveReplicaSets(oldRSs) { - if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil { + if _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, old, 0, deployment); err != nil { return err } } @@ -384,7 +384,7 @@ func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.R } // TODO: Use transactions when we have them. - if _, _, err := dc.scaleReplicaSet(rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil { + if _, _, err := dc.scaleReplicaSet(ctx, rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil { // Return as soon as we fail, the deployment is requeued return err } @@ -393,7 +393,7 @@ func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.R return nil } -func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment) (bool, *apps.ReplicaSet, error) { +func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(ctx context.Context, rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment) (bool, *apps.ReplicaSet, error) { // No need to scale if *(rs.Spec.Replicas) == newScale { return false, rs, nil @@ -404,11 +404,11 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSe } else { scalingOperation = "down" } - scaled, newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation) + scaled, newRS, err := dc.scaleReplicaSet(ctx, rs, newScale, deployment, scalingOperation) return scaled, newRS, err } -func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) { +func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) { sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale @@ -420,7 +420,7 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in rsCopy := rs.DeepCopy() *(rsCopy.Spec.Replicas) = newScale deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment)) - rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{}) + rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) if err == nil && sizeNeedsUpdate { scaled = true dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) @@ -432,7 +432,7 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in // cleanupDeployment is responsible for cleaning up a deployment ie. retains all but the latest N old replica sets // where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept // around by default 1) for historical reasons and 2) for the ability to rollback a deployment. -func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error { +func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error { if !deploymentutil.HasRevisionHistoryLimit(deployment) { return nil } @@ -458,7 +458,7 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep continue } klog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name) - if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(context.TODO(), rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { + if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { // Return error instead of aggregating and continuing DELETEs on the theory // that we may be overloading the api server. return err @@ -469,7 +469,7 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep } // syncDeploymentStatus checks if the status is up-to-date and sync it if necessary -func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error { +func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error { newStatus := calculateStatus(allRSs, newRS, d) if reflect.DeepEqual(d.Status, newStatus) { @@ -478,7 +478,7 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newDeployment := d newDeployment.Status = newStatus - _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{}) + _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{}) return err } @@ -525,8 +525,8 @@ func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployme // by looking at the desired-replicas annotation in the active replica sets of the deployment. // // rsList should come from getReplicaSetsForDeployment(d). -func (dc *DeploymentController) isScalingEvent(d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) { - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false) +func (dc *DeploymentController) isScalingEvent(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) { + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false) if err != nil { return false, err } diff --git a/pkg/controller/deployment/sync_test.go b/pkg/controller/deployment/sync_test.go index 3405f8c5e602e..25b55d1221704 100644 --- a/pkg/controller/deployment/sync_test.go +++ b/pkg/controller/deployment/sync_test.go @@ -17,6 +17,7 @@ limitations under the License. package deployment import ( + "context" "math" "testing" "time" @@ -297,7 +298,7 @@ func TestScale(t *testing.T) { deploymentutil.SetReplicasAnnotations(rs, desiredReplicas, desiredReplicas+deploymentutil.MaxSurge(*test.oldDeployment)) } - if err := dc.scale(test.deployment, test.newRS, test.oldRSs); err != nil { + if err := dc.scale(context.TODO(), test.deployment, test.newRS, test.oldRSs); err != nil { t.Errorf("%s: unexpected error: %v", test.name, err) return } @@ -433,7 +434,7 @@ func TestDeploymentController_cleanupDeployment(t *testing.T) { t.Logf(" &test.revisionHistoryLimit: %d", test.revisionHistoryLimit) d := newDeployment("foo", 1, &test.revisionHistoryLimit, nil, nil, map[string]string{"foo": "bar"}) - controller.cleanupDeployment(test.oldRSs, d) + controller.cleanupDeployment(context.TODO(), test.oldRSs, d) gotDeletions := 0 for _, action := range fake.Actions() { @@ -565,7 +566,7 @@ func TestDeploymentController_cleanupDeploymentOrder(t *testing.T) { informers.Start(stopCh) d := newDeployment("foo", 1, &test.revisionHistoryLimit, nil, nil, map[string]string{"foo": "bar"}) - controller.cleanupDeployment(test.oldRSs, d) + controller.cleanupDeployment(context.TODO(), test.oldRSs, d) deletedRSs := sets.String{} for _, action := range fake.Actions() { diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 31d4727d8972f..4c6c04de27e46 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -514,7 +514,7 @@ func (jm *Controller) getPodsForJob(j *batch.Job, withFinalizers bool) ([]*v1.Po } // If any adoptions are attempted, we should first recheck for deletion // with an uncached quorum read sometime after listing Pods (see #42639). - canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { + canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) { fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{}) if err != nil { return nil, err @@ -530,7 +530,7 @@ func (jm *Controller) getPodsForJob(j *batch.Job, withFinalizers bool) ([]*v1.Po } cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, finalizers...) // When adopting Pods, this operation adds an ownerRef and finalizers. - pods, err = cm.ClaimPods(pods) + pods, err = cm.ClaimPods(context.TODO(), pods) if err != nil || !withFinalizers { return pods, err } diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index c040ee2aa264d..64df135426926 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -88,7 +88,7 @@ type ReplicaSetController struct { // It resumes normal action after observing the watch events for them. burstReplicas int // To allow injection of syncReplicaSet for testing. - syncHandler func(rsKey string) error + syncHandler func(ctx context.Context, rsKey string) error // A TTLCache of pod creates/deletes each rc expects to see. expectations *controller.UIDTrackingControllerExpectations @@ -178,7 +178,7 @@ func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) } // Run begins watching and syncing. -func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { +func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer rsc.queue.ShutDown() @@ -186,15 +186,15 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting %v controller", controllerName) defer klog.Infof("Shutting down %v controller", controllerName) - if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) { + if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(rsc.worker, time.Second, stopCh) + go wait.UntilWithContext(ctx, rsc.worker, time.Second) } - <-stopCh + <-ctx.Done() } // getReplicaSetsWithSameController returns a list of ReplicaSets with the same @@ -515,19 +515,19 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) { // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. -func (rsc *ReplicaSetController) worker() { - for rsc.processNextWorkItem() { +func (rsc *ReplicaSetController) worker(ctx context.Context) { + for rsc.processNextWorkItem(ctx) { } } -func (rsc *ReplicaSetController) processNextWorkItem() bool { +func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool { key, quit := rsc.queue.Get() if quit { return false } defer rsc.queue.Done(key) - err := rsc.syncHandler(key.(string)) + err := rsc.syncHandler(ctx, key.(string)) if err == nil { rsc.queue.Forget(key) return true @@ -647,7 +647,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps // syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled, // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be // invoked concurrently with the same key. -func (rsc *ReplicaSetController) syncReplicaSet(key string) error { +func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime)) @@ -686,7 +686,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { // NOTE: filteredPods are pointing to objects from cache - if you need to // modify them, you need to copy it first. - filteredPods, err = rsc.claimPods(rs, selector, filteredPods) + filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods) if err != nil { return err } @@ -714,11 +714,11 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { return manageReplicasErr } -func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) { +func (rsc *ReplicaSetController) claimPods(ctx context.Context, rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) { // If any adoptions are attempted, we should first recheck for deletion with // an uncached quorum read sometime after listing Pods (see #42639). - canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { - fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(context.TODO(), rs.Name, metav1.GetOptions{}) + canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) { + fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(ctx, rs.Name, metav1.GetOptions{}) if err != nil { return nil, err } @@ -728,7 +728,7 @@ func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels. return fresh, nil }) cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc) - return cm.ClaimPods(filteredPods) + return cm.ClaimPods(ctx, filteredPods) } // slowStartBatch tries to call the provided function a total of 'count' times, diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 3c09d2816cc0f..5f1955baad1c1 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -185,12 +185,12 @@ func processSync(rsc *ReplicaSetController, key string) error { rsc.syncHandler = oldSyncHandler }() var syncErr error - rsc.syncHandler = func(key string) error { - syncErr = oldSyncHandler(key) + rsc.syncHandler = func(ctx context.Context, key string) error { + syncErr = oldSyncHandler(ctx, key) return syncErr } rsc.queue.Add(key) - rsc.processNextWorkItem() + rsc.processNextWorkItem(context.TODO()) return syncErr } @@ -224,7 +224,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) { newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 2, v1.PodRunning, labelMap, rsSpec, "pod") manager.podControl = &fakePodControl - manager.syncReplicaSet(GetKey(rsSpec, t)) + manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0) if err != nil { t.Fatal(err) @@ -240,7 +240,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { manager.podControl = &fakePodControl received := make(chan string) - manager.syncHandler = func(key string) error { + manager.syncHandler = func(ctx context.Context, key string) error { received <- key return nil } @@ -253,7 +253,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { pods := newPodList(nil, 1, v1.PodRunning, labelMap, rsSpec, "pod") manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]}) - go manager.worker() + go manager.worker(context.TODO()) expected := GetKey(rsSpec, t) select { @@ -282,7 +282,7 @@ func TestSyncReplicaSetCreateFailures(t *testing.T) { informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs) manager.podControl = &fakePodControl - manager.syncReplicaSet(GetKey(rs, t)) + manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) err := validateSyncReplicaSet(&fakePodControl, fakePodControl.CreateLimit, 0, 0) if err != nil { t.Fatal(err) @@ -324,7 +324,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { rsSpec.Status.Replicas = 1 rsSpec.Status.ReadyReplicas = 1 rsSpec.Status.AvailableReplicas = 1 - manager.syncReplicaSet(GetKey(rsSpec, t)) + manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0) if err != nil { t.Fatal(err) @@ -335,7 +335,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { rsSpec.Status.ReadyReplicas = 0 rsSpec.Status.AvailableReplicas = 0 fakePodControl.Clear() - manager.syncReplicaSet(GetKey(rsSpec, t)) + manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0) if err != nil { t.Fatal(err) @@ -356,7 +356,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { fakePodControl.Clear() fakePodControl.Err = fmt.Errorf("fake Error") - manager.syncReplicaSet(GetKey(rsSpec, t)) + manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0) if err != nil { t.Fatal(err) @@ -365,7 +365,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { // This replica should not need a Lowering of expectations, since the previous create failed fakePodControl.Clear() fakePodControl.Err = nil - manager.syncReplicaSet(GetKey(rsSpec, t)) + manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0) if err != nil { t.Fatal(err) @@ -600,7 +600,7 @@ func TestWatchControllers(t *testing.T) { // The update sent through the fakeWatcher should make its way into the workqueue, // and eventually into the syncHandler. The handler validates the received controller // and closes the received channel to indicate that the test can finish. - manager.syncHandler = func(key string) error { + manager.syncHandler = func(ctx context.Context, key string) error { obj, exists, err := informers.Apps().V1().ReplicaSets().Informer().GetIndexer().GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find replica set under key %v", key) @@ -614,7 +614,7 @@ func TestWatchControllers(t *testing.T) { } // Start only the ReplicaSet watcher and the workqueue, send a watch event, // and make sure it hits the sync method. - go wait.Until(manager.worker, 10*time.Millisecond, stopCh) + go wait.UntilWithContext(context.TODO(), manager.worker, 10*time.Millisecond) testRSSpec.Name = "foo" fakeWatch.Add(&testRSSpec) @@ -645,7 +645,7 @@ func TestWatchPods(t *testing.T) { received := make(chan string) // The pod update sent through the fakeWatcher should figure out the managing ReplicaSet and // send it into the syncHandler. - manager.syncHandler = func(key string) error { + manager.syncHandler = func(ctx context.Context, key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { t.Errorf("Error splitting key: %v", err) @@ -664,7 +664,7 @@ func TestWatchPods(t *testing.T) { // Start only the pod watcher and the workqueue, send a watch event, // and make sure it hits the sync method for the right ReplicaSet. go informers.Core().V1().Pods().Informer().Run(stopCh) - go manager.Run(1, stopCh) + go manager.Run(context.TODO(), 1) pods := newPodList(nil, 1, v1.PodRunning, labelMap, testRSSpec, "pod") testPod := pods.Items[0] @@ -685,7 +685,7 @@ func TestUpdatePods(t *testing.T) { received := make(chan string) - manager.syncHandler = func(key string) error { + manager.syncHandler = func(ctx context.Context, key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { t.Errorf("Error splitting key: %v", err) @@ -698,7 +698,7 @@ func TestUpdatePods(t *testing.T) { return nil } - go wait.Until(manager.worker, 10*time.Millisecond, stopCh) + go wait.UntilWithContext(context.TODO(), manager.worker, 10*time.Millisecond) // Put 2 ReplicaSets and one pod into the informers labelMap1 := map[string]string{"foo": "bar"} @@ -829,7 +829,7 @@ func TestControllerUpdateRequeue(t *testing.T) { // Enqueue once. Then process it. Disable rate-limiting for this. manager.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter()) manager.enqueueRS(rs) - manager.processNextWorkItem() + manager.processNextWorkItem(context.TODO()) // It should have been requeued. if got, want := manager.queue.Len(), 1; got != want { t.Errorf("queue.Len() = %v, want %v", got, want) @@ -909,7 +909,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec) for i := 0; i < numReplicas; i += burstReplicas { - manager.syncReplicaSet(GetKey(rsSpec, t)) + manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) // The store accrues active pods. It's also used by the ReplicaSet to determine how many // replicas to create. @@ -988,7 +988,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // Check that the ReplicaSet didn't take any action for all the above pods fakePodControl.Clear() - manager.syncReplicaSet(GetKey(rsSpec, t)) + manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0) if err != nil { t.Fatal(err) @@ -1075,7 +1075,7 @@ func TestRSSyncExpectations(t *testing.T) { informers.Core().V1().Pods().Informer().GetIndexer().Add(&postExpectationsPod) }, }) - manager.syncReplicaSet(GetKey(rsSpec, t)) + manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t)) err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0) if err != nil { t.Fatal(err) @@ -1095,7 +1095,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { manager.podControl = &fakePodControl // This should set expectations for the ReplicaSet - manager.syncReplicaSet(GetKey(rs, t)) + manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0) if err != nil { t.Fatal(err) @@ -1116,7 +1116,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { } informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Delete(rs) manager.deleteRS(rs) - manager.syncReplicaSet(GetKey(rs, t)) + manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) _, exists, err = manager.expectations.GetExpectations(rsKey) if err != nil { @@ -1129,7 +1129,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { // This should have no effect, since we've deleted the ReplicaSet. podExp.Add(-1, 0) informers.Core().V1().Pods().Informer().GetIndexer().Replace(make([]interface{}, 0), "0") - manager.syncReplicaSet(GetKey(rs, t)) + manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0) if err != nil { t.Fatal(err) @@ -1171,7 +1171,7 @@ func TestExpectationsOnRecreate(t *testing.T) { t.Fatalf("initial RS didn't result in new item in the queue: %v", err) } - ok := manager.processNextWorkItem() + ok := manager.processNextWorkItem(context.TODO()) if !ok { t.Fatal("queue is shutting down") } @@ -1257,7 +1257,7 @@ func TestExpectationsOnRecreate(t *testing.T) { t.Fatalf("Re-creating RS didn't result in new item in the queue: %v", err) } - ok = manager.processNextWorkItem() + ok = manager.processNextWorkItem(context.TODO()) if !ok { t.Fatal("Queue is shutting down!") } @@ -1457,7 +1457,7 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { pod := newPod("pod", rs, v1.PodRunning, nil, true) pod.OwnerReferences = []metav1.OwnerReference{otherControllerReference} informers.Core().V1().Pods().Informer().GetIndexer().Add(pod) - err := manager.syncReplicaSet(GetKey(rs, t)) + err := manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) if err != nil { t.Fatal(err) } @@ -1514,7 +1514,7 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1) // no patch, no create - err := manager.syncReplicaSet(GetKey(rs, t)) + err := manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) if err != nil { t.Fatal(err) } @@ -1543,7 +1543,7 @@ func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) { informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1) // sync should abort. - err := manager.syncReplicaSet(GetKey(rs, t)) + err := manager.syncReplicaSet(context.TODO(), GetKey(rs, t)) if err == nil { t.Error("syncReplicaSet() err = nil, expected non-nil") } diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index b995d0892f5b2..4bfc5c847b13e 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -41,7 +41,7 @@ import ( type StatefulPodControlInterface interface { // CreateStatefulPod create a Pod in a StatefulSet. Any PVCs necessary for the Pod are created prior to creating // the Pod. If the returned error is nil the Pod and its PVCs have been created. - CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error + CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error // UpdateStatefulPod Updates a Pod in a StatefulSet. If the Pod already has the correct identity and stable // storage this method is a no-op. If the Pod must be mutated to conform to the Set, it is mutated and updated. // pod is an in-out parameter, and any updates made to the pod are reflected as mutations to this parameter. If @@ -72,14 +72,14 @@ type realStatefulPodControl struct { recorder record.EventRecorder } -func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { +func (spc *realStatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { // Create the Pod's PVCs prior to creating the Pod if err := spc.createPersistentVolumeClaims(set, pod); err != nil { spc.recordPodEvent("create", set, pod, err) return err } // If we created the PVCs attempt to create the Pod - _, err := spc.client.CoreV1().Pods(set.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + _, err := spc.client.CoreV1().Pods(set.Namespace).Create(ctx, pod, metav1.CreateOptions{}) // sink already exists errors if apierrors.IsAlreadyExists(err) { return err diff --git a/pkg/controller/statefulset/stateful_pod_control_test.go b/pkg/controller/statefulset/stateful_pod_control_test.go index b4403b06a595b..82e535707ed14 100644 --- a/pkg/controller/statefulset/stateful_pod_control_test.go +++ b/pkg/controller/statefulset/stateful_pod_control_test.go @@ -17,6 +17,7 @@ limitations under the License. package statefulset import ( + "context" "errors" "strings" "testing" @@ -56,7 +57,7 @@ func TestStatefulPodControlCreatesPods(t *testing.T) { create := action.(core.CreateAction) return true, create.GetObject(), nil }) - if err := control.CreateStatefulPod(set, pod); err != nil { + if err := control.CreateStatefulPod(context.TODO(), set, pod); err != nil { t.Errorf("StatefulPodControl failed to create Pod error: %s", err) } events := collectEvents(recorder.Events) @@ -90,7 +91,7 @@ func TestStatefulPodControlCreatePodExists(t *testing.T) { fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { return true, pod, apierrors.NewAlreadyExists(action.GetResource().GroupResource(), pod.Name) }) - if err := control.CreateStatefulPod(set, pod); !apierrors.IsAlreadyExists(err) { + if err := control.CreateStatefulPod(context.TODO(), set, pod); !apierrors.IsAlreadyExists(err) { t.Errorf("Failed to create Pod error: %s", err) } events := collectEvents(recorder.Events) @@ -117,7 +118,7 @@ func TestStatefulPodControlCreatePodPvcCreateFailure(t *testing.T) { create := action.(core.CreateAction) return true, create.GetObject(), nil }) - if err := control.CreateStatefulPod(set, pod); err == nil { + if err := control.CreateStatefulPod(context.TODO(), set, pod); err == nil { t.Error("Failed to produce error on PVC creation failure") } events := collectEvents(recorder.Events) @@ -153,7 +154,7 @@ func TestStatefulPodControlCreatePodPvcDeleting(t *testing.T) { create := action.(core.CreateAction) return true, create.GetObject(), nil }) - if err := control.CreateStatefulPod(set, pod); err == nil { + if err := control.CreateStatefulPod(context.TODO(), set, pod); err == nil { t.Error("Failed to produce error on deleting PVC") } events := collectEvents(recorder.Events) @@ -191,7 +192,7 @@ func TestStatefulPodControlCreatePodPvcGetFailure(t *testing.T) { create := action.(core.CreateAction) return true, create.GetObject(), nil }) - if err := control.CreateStatefulPod(set, pod); err == nil { + if err := control.CreateStatefulPod(context.TODO(), set, pod); err == nil { t.Error("Failed to produce error on PVC creation failure") } events := collectEvents(recorder.Events) @@ -220,7 +221,7 @@ func TestStatefulPodControlCreatePodFailed(t *testing.T) { fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { return true, nil, apierrors.NewInternalError(errors.New("API server down")) }) - if err := control.CreateStatefulPod(set, pod); err == nil { + if err := control.CreateStatefulPod(context.TODO(), set, pod); err == nil { t.Error("Failed to produce error on Pod creation failure") } events := collectEvents(recorder.Events) diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 068d11c7e7937..7f5f1174ddbcb 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -141,22 +141,22 @@ func NewStatefulSetController( } // Run runs the statefulset controller. -func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) { +func (ssc *StatefulSetController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer ssc.queue.ShutDown() klog.Infof("Starting stateful set controller") defer klog.Infof("Shutting down statefulset controller") - if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) { + if !cache.WaitForNamedCacheSync("stateful set", ctx.Done(), ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(ssc.worker, time.Second, stopCh) + go wait.UntilWithContext(ctx, ssc.worker, time.Second) } - <-stopCh + <-ctx.Done() } // addPod adds the statefulset for the pod to the sync queue @@ -287,7 +287,7 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) { // // NOTE: Returned Pods are pointers to objects from the cache. // If you need to modify one, you need to copy it first. -func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) { +func (ssc *StatefulSetController) getPodsForStatefulSet(ctx context.Context, set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) { // List all pods to include the pods that don't match the selector anymore but // has a ControllerRef pointing to this StatefulSet. pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything()) @@ -300,15 +300,15 @@ func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, s return isMemberOf(set, pod) } - cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, ssc.canAdoptFunc(set)) - return cm.ClaimPods(pods, filter) + cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, ssc.canAdoptFunc(ctx, set)) + return cm.ClaimPods(ctx, pods, filter) } // If any adoptions are attempted, we should first recheck for deletion with // an uncached quorum read sometime after listing Pods/ControllerRevisions (see #42639). -func (ssc *StatefulSetController) canAdoptFunc(set *apps.StatefulSet) func() error { - return controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { - fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(context.TODO(), set.Name, metav1.GetOptions{}) +func (ssc *StatefulSetController) canAdoptFunc(ctx context.Context, set *apps.StatefulSet) func(ctx2 context.Context) error { + return controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) { + fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(ctx, set.Name, metav1.GetOptions{}) if err != nil { return nil, err } @@ -320,7 +320,7 @@ func (ssc *StatefulSetController) canAdoptFunc(set *apps.StatefulSet) func() err } // adoptOrphanRevisions adopts any orphaned ControllerRevisions matched by set's Selector. -func (ssc *StatefulSetController) adoptOrphanRevisions(set *apps.StatefulSet) error { +func (ssc *StatefulSetController) adoptOrphanRevisions(ctx context.Context, set *apps.StatefulSet) error { revisions, err := ssc.control.ListRevisions(set) if err != nil { return err @@ -332,7 +332,7 @@ func (ssc *StatefulSetController) adoptOrphanRevisions(set *apps.StatefulSet) er } } if len(orphanRevisions) > 0 { - canAdoptErr := ssc.canAdoptFunc(set)() + canAdoptErr := ssc.canAdoptFunc(ctx, set)(ctx) if canAdoptErr != nil { return fmt.Errorf("can't adopt ControllerRevisions: %v", canAdoptErr) } @@ -403,13 +403,13 @@ func (ssc *StatefulSetController) enqueueSSAfter(ss *apps.StatefulSet, duration // processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never // invoked concurrently with the same key. -func (ssc *StatefulSetController) processNextWorkItem() bool { +func (ssc *StatefulSetController) processNextWorkItem(ctx context.Context) bool { key, quit := ssc.queue.Get() if quit { return false } defer ssc.queue.Done(key) - if err := ssc.sync(key.(string)); err != nil { + if err := ssc.sync(ctx, key.(string)); err != nil { utilruntime.HandleError(fmt.Errorf("error syncing StatefulSet %v, requeuing: %v", key.(string), err)) ssc.queue.AddRateLimited(key) } else { @@ -419,13 +419,13 @@ func (ssc *StatefulSetController) processNextWorkItem() bool { } // worker runs a worker goroutine that invokes processNextWorkItem until the controller's queue is closed -func (ssc *StatefulSetController) worker() { - for ssc.processNextWorkItem() { +func (ssc *StatefulSetController) worker(ctx context.Context) { + for ssc.processNextWorkItem(ctx) { } } // sync syncs the given statefulset. -func (ssc *StatefulSetController) sync(key string) error { +func (ssc *StatefulSetController) sync(ctx context.Context, key string) error { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime)) @@ -452,25 +452,25 @@ func (ssc *StatefulSetController) sync(key string) error { return nil } - if err := ssc.adoptOrphanRevisions(set); err != nil { + if err := ssc.adoptOrphanRevisions(ctx, set); err != nil { return err } - pods, err := ssc.getPodsForStatefulSet(set, selector) + pods, err := ssc.getPodsForStatefulSet(ctx, set, selector) if err != nil { return err } - return ssc.syncStatefulSet(set, pods) + return ssc.syncStatefulSet(ctx, set, pods) } // syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod). -func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error { +func (ssc *StatefulSetController) syncStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) error { klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods)) var status *apps.StatefulSetStatus var err error // TODO: investigate where we mutate the set during the update as it is not obvious. - status, err = ssc.control.UpdateStatefulSet(set.DeepCopy(), pods) + status, err = ssc.control.UpdateStatefulSet(ctx, set.DeepCopy(), pods) if err != nil { return err } diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index d43b8e621c199..7f653d13c20e5 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -17,6 +17,7 @@ limitations under the License. package statefulset import ( + "context" "sort" apps "k8s.io/api/apps/v1" @@ -38,7 +39,7 @@ type StatefulSetControlInterface interface { // If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy. // Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to // exit exceptionally at any point provided they wish the update to be re-run at a later point in time. - UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) + UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) // ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned // error is nil, the returns slice of ControllerRevisions is valid. ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) @@ -73,7 +74,7 @@ type defaultStatefulSetControl struct { // strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and // in no particular order. Clients using the burst strategy should be careful to ensure they // understand the consistency implications of having unpredictable numbers of pods available. -func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) { +func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) { // list all revisions and sort them revisions, err := ssc.ListRevisions(set) if err != nil { @@ -81,7 +82,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p } history.SortControllerRevisions(revisions) - currentRevision, updateRevision, status, err := ssc.performUpdate(set, pods, revisions) + currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions) if err != nil { return nil, utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)}) } @@ -91,7 +92,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p } func (ssc *defaultStatefulSetControl) performUpdate( - set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) { + ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) { var currentStatus *apps.StatefulSetStatus // get the current, and update revisions currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions) @@ -100,12 +101,12 @@ func (ssc *defaultStatefulSetControl) performUpdate( } // perform the main update function and get the status - currentStatus, err = ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods) + currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods) if err != nil { return currentRevision, updateRevision, currentStatus, err } // update the set's status - err = ssc.updateStatefulSetStatus(set, currentStatus) + err = ssc.updateStatefulSetStatus(ctx, set, currentStatus) if err != nil { return currentRevision, updateRevision, currentStatus, err } @@ -268,6 +269,7 @@ func (ssc *defaultStatefulSetControl) getStatefulSetRevisions( // Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the // update must be recorded. If the error is not nil, the method should be retried until successful. func (ssc *defaultStatefulSetControl) updateStatefulSet( + ctx context.Context, set *apps.StatefulSet, currentRevision *apps.ControllerRevision, updateRevision *apps.ControllerRevision, @@ -416,7 +418,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } // If we find a Pod that has not been created we create the Pod if !isCreated(replicas[i]) { - if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil { + if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil { return &status, err } status.Replicas++ @@ -579,6 +581,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // mutated to indicate completion. If status is semantically equivalent to set's Status no update is performed. If the // returned error is nil, the update is successful. func (ssc *defaultStatefulSetControl) updateStatefulSetStatus( + ctx context.Context, set *apps.StatefulSet, status *apps.StatefulSetStatus) error { // complete any in progress rolling update if necessary @@ -591,7 +594,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSetStatus( // copy set and update its status set = set.DeepCopy() - if err := ssc.statusUpdater.UpdateStatefulSetStatus(set, status); err != nil { + if err := ssc.statusUpdater.UpdateStatefulSetStatus(ctx, set, status); err != nil { return err } diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 6911e799fb245..098cf321f7953 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -17,6 +17,7 @@ limitations under the License. package statefulset import ( + "context" "errors" "fmt" "math/rand" @@ -229,7 +230,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) if err != nil { t.Error(err) } - if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { t.Errorf("Failed to update StatefulSet : %s", err) } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -239,7 +240,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) if pods, err = spc.setPodRunning(set, i); err != nil { t.Error(err) } - if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { t.Errorf("Failed to update StatefulSet : %s", err) } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -254,7 +255,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) if err != nil { t.Error(err) } - if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { t.Errorf("Failed to update StatefulSet : %s", err) } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -278,7 +279,7 @@ func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invarian if err != nil { t.Error(err) } - if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } if err := invariants(set, spc); err != nil { @@ -290,7 +291,7 @@ func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invarian } pods[0].Status.Phase = v1.PodFailed spc.podsIndexer.Update(pods[0]) - if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } if err := invariants(set, spc); err != nil { @@ -371,7 +372,7 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF spc.podsIndexer.Update(pods[0]) // now it should fail - if _, err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSetControl did not return InternalError found %s", err) } } @@ -417,7 +418,7 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in if err != nil { t.Error(err) } - if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } if err := invariants(set, spc); err != nil { @@ -430,13 +431,13 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in pods[0].Status.Phase = v1.PodFailed spc.podsIndexer.Update(pods[0]) spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) - if _, err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) { t.Errorf("StatefulSet failed to %s", err) } if err := invariants(set, spc); err != nil { t.Error(err) } - if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } if err := invariants(set, spc); err != nil { @@ -1282,7 +1283,7 @@ func TestStatefulSetControlLimitsHistory(t *testing.T) { if err != nil { t.Fatalf("%s: %s", test.name, err) } - _, err = ssc.UpdateStatefulSet(set, pods) + _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods) if err != nil { t.Fatalf("%s: %s", test.name, err) } @@ -1629,7 +1630,7 @@ func TestStatefulSetAvailability(t *testing.T) { if err != nil { t.Fatalf("%s: %s", test.name, err) } - status, err := ssc.UpdateStatefulSet(set, pods) + status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods) if err != nil { t.Fatalf("%s: %s", test.name, err) } @@ -1824,7 +1825,7 @@ func (spc *fakeStatefulPodControl) setPodTerminated(set *apps.StatefulSet, ordin return spc.podsLister.Pods(set.Namespace).List(selector) } -func (spc *fakeStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { +func (spc *fakeStatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { defer spc.createPodTracker.inc() if spc.createPodTracker.errorReady() { defer spc.createPodTracker.reset() @@ -1890,7 +1891,7 @@ func newFakeStatefulSetStatusUpdater(setInformer appsinformers.StatefulSetInform } } -func (ssu *fakeStatefulSetStatusUpdater) UpdateStatefulSetStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) error { +func (ssu *fakeStatefulSetStatusUpdater) UpdateStatefulSetStatus(ctx context.Context, set *apps.StatefulSet, status *apps.StatefulSetStatus) error { defer ssu.updateStatusTracker.inc() if ssu.updateStatusTracker.errorReady() { defer ssu.updateStatusTracker.reset() @@ -2089,7 +2090,7 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet, } } // run the controller once and check invariants - _, err = ssc.UpdateStatefulSet(set, pods) + _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods) if err != nil { return err } @@ -2117,7 +2118,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn } sort.Sort(ascendingOrdinal(pods)) if ordinal := len(pods) - 1; ordinal >= 0 { - if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { return err } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -2127,7 +2128,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn if pods, err = spc.addTerminatingPod(set, ordinal); err != nil { return err } - if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { return err } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -2144,7 +2145,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn spc.podsIndexer.Delete(pods[len(pods)-1]) } } - if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { return err } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -2207,7 +2208,7 @@ func updateStatefulSetControl(set *apps.StatefulSet, if err != nil { return err } - if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { return err } @@ -2255,7 +2256,7 @@ func updateStatefulSetControl(set *apps.StatefulSet, } } - if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { return err } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) diff --git a/pkg/controller/statefulset/stateful_set_status_updater.go b/pkg/controller/statefulset/stateful_set_status_updater.go index 52c79c28b63e8..a71c5455148de 100644 --- a/pkg/controller/statefulset/stateful_set_status_updater.go +++ b/pkg/controller/statefulset/stateful_set_status_updater.go @@ -33,7 +33,7 @@ import ( type StatefulSetStatusUpdaterInterface interface { // UpdateStatefulSetStatus sets the set's Status to status. Implementations are required to retry on conflicts, // but fail on other errors. If the returned error is nil set's Status has been successfully set to status. - UpdateStatefulSetStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) error + UpdateStatefulSetStatus(ctx context.Context, set *apps.StatefulSet, status *apps.StatefulSetStatus) error } // NewRealStatefulSetStatusUpdater returns a StatefulSetStatusUpdaterInterface that updates the Status of a StatefulSet, @@ -50,11 +50,13 @@ type realStatefulSetStatusUpdater struct { } func (ssu *realStatefulSetStatusUpdater) UpdateStatefulSetStatus( + ctx context.Context, set *apps.StatefulSet, status *apps.StatefulSetStatus) error { // don't wait due to limited number of clients, but backoff after the default number of steps return retry.RetryOnConflict(retry.DefaultRetry, func() error { set.Status = *status + // TODO: This context.TODO should use a real context once we have RetryOnConflictWithContext _, updateErr := ssu.client.AppsV1().StatefulSets(set.Namespace).UpdateStatus(context.TODO(), set, metav1.UpdateOptions{}) if updateErr == nil { return nil diff --git a/pkg/controller/statefulset/stateful_set_status_updater_test.go b/pkg/controller/statefulset/stateful_set_status_updater_test.go index 4eb9f63a831ef..22e608fcbb9a4 100644 --- a/pkg/controller/statefulset/stateful_set_status_updater_test.go +++ b/pkg/controller/statefulset/stateful_set_status_updater_test.go @@ -17,6 +17,7 @@ limitations under the License. package statefulset import ( + "context" "errors" "testing" @@ -41,7 +42,7 @@ func TestStatefulSetUpdaterUpdatesSetStatus(t *testing.T) { update := action.(core.UpdateAction) return true, update.GetObject(), nil }) - if err := updater.UpdateStatefulSetStatus(set, &status); err != nil { + if err := updater.UpdateStatefulSetStatus(context.TODO(), set, &status); err != nil { t.Errorf("Error returned on successful status update: %s", err) } if set.Status.Replicas != 2 { @@ -62,7 +63,7 @@ func TestStatefulSetStatusUpdaterUpdatesObservedGeneration(t *testing.T) { } return true, sts, nil }) - if err := updater.UpdateStatefulSetStatus(set, &status); err != nil { + if err := updater.UpdateStatefulSetStatus(context.TODO(), set, &status); err != nil { t.Errorf("Error returned on successful status update: %s", err) } } @@ -78,7 +79,7 @@ func TestStatefulSetStatusUpdaterUpdateReplicasFailure(t *testing.T) { fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { return true, nil, apierrors.NewInternalError(errors.New("API server down")) }) - if err := updater.UpdateStatefulSetStatus(set, &status); err == nil { + if err := updater.UpdateStatefulSetStatus(context.TODO(), set, &status); err == nil { t.Error("Failed update did not return error") } } @@ -101,7 +102,7 @@ func TestStatefulSetStatusUpdaterUpdateReplicasConflict(t *testing.T) { return true, update.GetObject(), nil }) - if err := updater.UpdateStatefulSetStatus(set, &status); err != nil { + if err := updater.UpdateStatefulSetStatus(context.TODO(), set, &status); err != nil { t.Errorf("UpdateStatefulSetStatus returned an error: %s", err) } if set.Status.Replicas != 2 { @@ -121,7 +122,7 @@ func TestStatefulSetStatusUpdaterUpdateReplicasConflictFailure(t *testing.T) { update := action.(core.UpdateAction) return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("object already exists")) }) - if err := updater.UpdateStatefulSetStatus(set, &status); err == nil { + if err := updater.UpdateStatefulSetStatus(context.TODO(), set, &status); err == nil { t.Error("UpdateStatefulSetStatus failed to return an error on get failure") } } @@ -136,7 +137,7 @@ func TestStatefulSetStatusUpdaterGetAvailableReplicas(t *testing.T) { update := action.(core.UpdateAction) return true, update.GetObject(), nil }) - if err := updater.UpdateStatefulSetStatus(set, &status); err != nil { + if err := updater.UpdateStatefulSetStatus(context.TODO(), set, &status); err != nil { t.Errorf("Error returned on successful status update: %s", err) } if set.Status.AvailableReplicas != 3 { diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index cab369012a889..31bd9c332058f 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -18,6 +18,7 @@ package statefulset import ( "bytes" + "context" "encoding/json" "sort" "testing" @@ -106,7 +107,7 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) { if err != nil { t.Error(err) } - ssc.syncStatefulSet(set, pods) + ssc.syncStatefulSet(context.TODO(), set, pods) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { t.Error(err) @@ -555,7 +556,7 @@ func TestGetPodsForStatefulSetAdopt(t *testing.T) { if err != nil { t.Fatal(err) } - pods, err := ssc.getPodsForStatefulSet(set, selector) + pods, err := ssc.getPodsForStatefulSet(context.TODO(), set, selector) if err != nil { t.Fatalf("getPodsForStatefulSet() error: %v", err) } @@ -592,7 +593,7 @@ func TestAdoptOrphanRevisions(t *testing.T) { spc.revisionsIndexer.Add(ss1Rev1) spc.revisionsIndexer.Add(ss1Rev2) - err = ssc.adoptOrphanRevisions(ss1) + err = ssc.adoptOrphanRevisions(context.TODO(), ss1) if err != nil { t.Errorf("adoptOrphanRevisions() error: %v", err) } @@ -634,7 +635,7 @@ func TestGetPodsForStatefulSetRelease(t *testing.T) { if err != nil { t.Fatal(err) } - pods, err := ssc.getPodsForStatefulSet(set, selector) + pods, err := ssc.getPodsForStatefulSet(context.TODO(), set, selector) if err != nil { t.Fatalf("getPodsForStatefulSet() error: %v", err) } @@ -673,7 +674,7 @@ func newFakeStatefulSetController(initialObjects ...runtime.Object) (*StatefulSe func fakeWorker(ssc *StatefulSetController) { if obj, done := ssc.queue.Get(); !done { - ssc.sync(obj.(string)) + ssc.sync(context.TODO(), obj.(string)) ssc.queue.Done(obj) } } diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index d6b610bd11240..cbc116766c6fb 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -438,7 +438,7 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) { setupScheduler(ctx, t, clientset, informers) informers.Start(ctx.Done()) - go dc.Run(5, ctx.Done()) + go dc.Run(ctx, 5) ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy @@ -474,7 +474,7 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) { defer cancel() informers.Start(ctx.Done()) - go dc.Run(5, ctx.Done()) + go dc.Run(ctx, 5) // Start Scheduler setupScheduler(ctx, t, clientset, informers) @@ -510,7 +510,7 @@ func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) { defer cancel() informers.Start(ctx.Done()) - go dc.Run(5, ctx.Done()) + go dc.Run(ctx, 5) // Start Scheduler setupScheduler(ctx, t, clientset, informers) @@ -579,7 +579,7 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) { defer cancel() informers.Start(ctx.Done()) - go dc.Run(5, ctx.Done()) + go dc.Run(ctx, 5) // Start Scheduler setupScheduler(ctx, t, clientset, informers) @@ -626,7 +626,7 @@ func TestInsufficientCapacityNode(t *testing.T) { defer cancel() informers.Start(ctx.Done()) - go dc.Run(5, ctx.Done()) + go dc.Run(ctx, 5) // Start Scheduler setupScheduler(ctx, t, clientset, informers) @@ -689,7 +689,7 @@ func TestLaunchWithHashCollision(t *testing.T) { defer cancel() informers.Start(ctx.Done()) - go dc.Run(5, ctx.Done()) + go dc.Run(ctx, 5) // Start Scheduler setupScheduler(ctx, t, clientset, informers) @@ -799,7 +799,7 @@ func TestTaintedNode(t *testing.T) { defer cancel() informers.Start(ctx.Done()) - go dc.Run(5, ctx.Done()) + go dc.Run(ctx, 5) // Start Scheduler setupScheduler(ctx, t, clientset, informers) @@ -864,7 +864,7 @@ func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) { defer cancel() informers.Start(ctx.Done()) - go dc.Run(5, ctx.Done()) + go dc.Run(ctx, 5) // Start Scheduler setupScheduler(ctx, t, clientset, informers) @@ -872,7 +872,7 @@ func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) { ds := newDaemonSet("foo", ns.Name) ds.Spec.UpdateStrategy = *strategy ds.Spec.Template.Spec.HostNetwork = true - _, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{}) + _, err := dsClient.Create(ctx, ds, metav1.CreateOptions{}) if err != nil { t.Fatalf("Failed to create DaemonSet: %v", err) } @@ -889,7 +889,7 @@ func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) { }, } - _, err = nodeClient.Create(context.TODO(), node, metav1.CreateOptions{}) + _, err = nodeClient.Create(ctx, node, metav1.CreateOptions{}) if err != nil { t.Fatalf("Failed to create node: %v", err) } @@ -907,7 +907,7 @@ func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) { }, } - _, err = nodeClient.Create(context.TODO(), nodeNU, metav1.CreateOptions{}) + _, err = nodeClient.Create(ctx, nodeNU, metav1.CreateOptions{}) if err != nil { t.Fatalf("Failed to create node: %v", err) } diff --git a/test/integration/deployment/deployment_test.go b/test/integration/deployment/deployment_test.go index 711b3b032743d..97263293ab12e 100644 --- a/test/integration/deployment/deployment_test.go +++ b/test/integration/deployment/deployment_test.go @@ -56,8 +56,8 @@ func TestNewDeployment(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) - go rm.Run(5, stopCh) - go dc.Run(5, stopCh) + go rm.Run(context.TODO(), 5) + go dc.Run(context.TODO(), 5) // Wait for the Deployment to be updated to revision 1 if err := tester.waitForDeploymentRevisionAndImage("1", fakeImage); err != nil { @@ -123,8 +123,8 @@ func TestDeploymentRollingUpdate(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) - go rm.Run(5, stopCh) - go dc.Run(5, stopCh) + go rm.Run(context.TODO(), 5) + go dc.Run(context.TODO(), 5) replicas := int32(20) tester := &deploymentTester{t: t, c: c, deployment: newDeployment(name, ns.Name, replicas)} @@ -266,8 +266,8 @@ func TestPausedDeployment(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) - go rm.Run(5, stopCh) - go dc.Run(5, stopCh) + go rm.Run(context.TODO(), 5) + go dc.Run(context.TODO(), 5) // Verify that the paused deployment won't create new replica set. if err := tester.expectNoNewReplicaSet(); err != nil { @@ -367,8 +367,8 @@ func TestScalePausedDeployment(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) - go rm.Run(5, stopCh) - go dc.Run(5, stopCh) + go rm.Run(context.TODO(), 5) + go dc.Run(context.TODO(), 5) // Wait for the Deployment to be updated to revision 1 if err := tester.waitForDeploymentRevisionAndImage("1", fakeImage); err != nil { @@ -448,8 +448,8 @@ func TestDeploymentHashCollision(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) - go rm.Run(5, stopCh) - go dc.Run(5, stopCh) + go rm.Run(context.TODO(), 5) + go dc.Run(context.TODO(), 5) // Wait for the Deployment to be updated to revision 1 if err := tester.waitForDeploymentRevisionAndImage("1", fakeImage); err != nil { @@ -551,8 +551,8 @@ func TestFailedDeployment(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) - go rm.Run(5, stopCh) - go dc.Run(5, stopCh) + go rm.Run(context.TODO(), 5) + go dc.Run(context.TODO(), 5) if err = tester.waitForDeploymentUpdatedReplicasGTE(replicas); err != nil { t.Fatal(err) @@ -593,8 +593,8 @@ func TestOverlappingDeployments(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) - go rm.Run(5, stopCh) - go dc.Run(5, stopCh) + go rm.Run(context.TODO(), 5) + go dc.Run(context.TODO(), 5) // Create 2 deployments with overlapping selectors var err error @@ -667,8 +667,8 @@ func TestScaledRolloutDeployment(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) - go rm.Run(5, stopCh) - go dc.Run(5, stopCh) + go rm.Run(context.TODO(), 5) + go dc.Run(context.TODO(), 5) // Create a deployment with rolling update strategy, max surge = 3, and max unavailable = 2 var err error @@ -870,8 +870,8 @@ func TestSpecReplicasChange(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) - go rm.Run(5, stopCh) - go dc.Run(5, stopCh) + go rm.Run(context.TODO(), 5) + go dc.Run(context.TODO(), 5) // Scale up/down deployment and verify its replicaset has matching .spec.replicas if err = tester.scaleDeployment(2); err != nil { @@ -928,8 +928,8 @@ func TestDeploymentAvailableCondition(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) - go rm.Run(5, stopCh) - go dc.Run(5, stopCh) + go rm.Run(context.TODO(), 5) + go dc.Run(context.TODO(), 5) // Wait for the deployment to be observed by the controller and has at least specified number of updated replicas if err = tester.waitForDeploymentUpdatedReplicasGTE(replicas); err != nil { @@ -1045,8 +1045,8 @@ func TestGeneralReplicaSetAdoption(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) - go rm.Run(5, stopCh) - go dc.Run(5, stopCh) + go rm.Run(context.TODO(), 5) + go dc.Run(context.TODO(), 5) // Wait for the Deployment to be updated to revision 1 if err := tester.waitForDeploymentRevisionAndImage("1", fakeImage); err != nil { @@ -1137,8 +1137,8 @@ func TestDeploymentScaleSubresource(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) - go rm.Run(5, stopCh) - go dc.Run(5, stopCh) + go rm.Run(context.TODO(), 5) + go dc.Run(context.TODO(), 5) // Wait for the Deployment to be updated to revision 1 if err := tester.waitForDeploymentRevisionAndImage("1", fakeImage); err != nil { @@ -1181,8 +1181,8 @@ func TestReplicaSetOrphaningAndAdoptionWhenLabelsChange(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) informers.Start(stopCh) - go rm.Run(5, stopCh) - go dc.Run(5, stopCh) + go rm.Run(context.TODO(), 5) + go dc.Run(context.TODO(), 5) // Wait for the Deployment to be updated to revision 1 if err := tester.waitForDeploymentRevisionAndImage("1", fakeImage); err != nil { diff --git a/test/integration/quota/quota_test.go b/test/integration/quota/quota_test.go index 720ae6c341e2d..ee856b236bb22 100644 --- a/test/integration/quota/quota_test.go +++ b/test/integration/quota/quota_test.go @@ -106,7 +106,7 @@ func TestQuota(t *testing.T) { replicationcontroller.BurstReplicas, ) rm.SetEventRecorder(&record.FakeRecorder{}) - go rm.Run(3, controllerCh) + go rm.Run(context.TODO(), 3) discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources listerFuncForResource := generic.ListerFuncForResourceFunc(informers.ForResource) @@ -337,7 +337,7 @@ func TestQuotaLimitedResourceDenial(t *testing.T) { replicationcontroller.BurstReplicas, ) rm.SetEventRecorder(&record.FakeRecorder{}) - go rm.Run(3, controllerCh) + go rm.Run(context.TODO(), 3) discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources listerFuncForResource := generic.ListerFuncForResourceFunc(informers.ForResource) @@ -467,7 +467,7 @@ func TestQuotaLimitService(t *testing.T) { replicationcontroller.BurstReplicas, ) rm.SetEventRecorder(&record.FakeRecorder{}) - go rm.Run(3, controllerCh) + go rm.Run(context.TODO(), 3) discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources listerFuncForResource := generic.ListerFuncForResourceFunc(informers.ForResource) diff --git a/test/integration/replicaset/replicaset_test.go b/test/integration/replicaset/replicaset_test.go index bcc125637dd1c..82cf9ec251027 100644 --- a/test/integration/replicaset/replicaset_test.go +++ b/test/integration/replicaset/replicaset_test.go @@ -156,7 +156,7 @@ func runControllerAndInformers(t *testing.T, rm *replicaset.ReplicaSetController stopCh := make(chan struct{}) informers.Start(stopCh) waitToObservePods(t, informers.Core().V1().Pods().Informer(), podNum) - go rm.Run(5, stopCh) + go rm.Run(context.TODO(), 5) return stopCh } diff --git a/test/integration/replicationcontroller/replicationcontroller_test.go b/test/integration/replicationcontroller/replicationcontroller_test.go index 1f1a2137b32f4..b25e64421b53a 100644 --- a/test/integration/replicationcontroller/replicationcontroller_test.go +++ b/test/integration/replicationcontroller/replicationcontroller_test.go @@ -137,7 +137,7 @@ func runControllerAndInformers(t *testing.T, rm *replication.ReplicationManager, stopCh := make(chan struct{}) informers.Start(stopCh) waitToObservePods(t, informers.Core().V1().Pods().Informer(), podNum) - go rm.Run(5, stopCh) + go rm.Run(context.TODO(), 5) return stopCh } diff --git a/test/integration/statefulset/util.go b/test/integration/statefulset/util.go index b05e7d58eb75e..b2ef1389c3916 100644 --- a/test/integration/statefulset/util.go +++ b/test/integration/statefulset/util.go @@ -185,7 +185,7 @@ func scSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *statefulset. func runControllerAndInformers(sc *statefulset.StatefulSetController, informers informers.SharedInformerFactory) chan struct{} { stopCh := make(chan struct{}) informers.Start(stopCh) - go sc.Run(5, stopCh) + go sc.Run(context.TODO(), 5) return stopCh }