diff --git a/cmd/kube-controller-manager/app/batch.go b/cmd/kube-controller-manager/app/batch.go index 5ffc7525251a3..e5edb4409f25e 100644 --- a/cmd/kube-controller-manager/app/batch.go +++ b/cmd/kube-controller-manager/app/batch.go @@ -30,6 +30,7 @@ import ( func startJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { go job.NewController( + ctx, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Batch().V1().Jobs(), controllerContext.ClientBuilder.ClientOrDie("job-controller"), diff --git a/hack/logcheck.conf b/hack/logcheck.conf index eecb9b186166d..3c535dc06df73 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -46,7 +46,6 @@ contextual k8s.io/kubernetes/test/e2e/dra/.* -contextual k8s.io/kubernetes/pkg/controller/endpointslice/.* -contextual k8s.io/kubernetes/pkg/controller/endpointslicemirroring/.* -contextual k8s.io/kubernetes/pkg/controller/garbagecollector/.* --contextual k8s.io/kubernetes/pkg/controller/job/.* -contextual k8s.io/kubernetes/pkg/controller/nodeipam/.* -contextual k8s.io/kubernetes/pkg/controller/podgc/.* -contextual k8s.io/kubernetes/pkg/controller/replicaset/.* diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index 3a1744fe1a654..6a8ba5787fad0 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -51,8 +51,8 @@ type orderedIntervals []interval // The old list is solely based off .status.completedIndexes, but returns an // empty list if this Job is not tracked with finalizers. The new list includes // the indexes that succeeded since the last sync. -func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) { - prevIntervals := succeededIndexesFromString(job.Status.CompletedIndexes, int(*job.Spec.Completions)) +func calculateSucceededIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) { + prevIntervals := succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) newSucceeded := sets.New[int]() for _, p := range pods { ix := getCompletionIndex(p.Annotations) @@ -148,7 +148,7 @@ func (oi orderedIntervals) has(ix int) bool { return oi[hi].First <= ix } -func succeededIndexesFromString(completedIndexes string, completions int) orderedIntervals { +func succeededIndexesFromString(logger klog.Logger, completedIndexes string, completions int) orderedIntervals { if completedIndexes == "" { return nil } @@ -160,7 +160,7 @@ func succeededIndexesFromString(completedIndexes string, completions int) ordere var err error inter.First, err = strconv.Atoi(limitsStr[0]) if err != nil { - klog.InfoS("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err) + logger.Info("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err) continue } if inter.First >= completions { @@ -169,7 +169,7 @@ func succeededIndexesFromString(completedIndexes string, completions int) ordere if len(limitsStr) > 1 { inter.Last, err = strconv.Atoi(limitsStr[1]) if err != nil { - klog.InfoS("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err) + logger.Info("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err) continue } if inter.Last >= completions { diff --git a/pkg/controller/job/indexed_job_utils_test.go b/pkg/controller/job/indexed_job_utils_test.go index 0b64ce5cd7477..f2796ac84316e 100644 --- a/pkg/controller/job/indexed_job_utils_test.go +++ b/pkg/controller/job/indexed_job_utils_test.go @@ -22,12 +22,14 @@ import ( "github.com/google/go-cmp/cmp" batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2/ktesting" "k8s.io/utils/pointer" ) const noIndex = "-" func TestCalculateSucceededIndexes(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) cases := map[string]struct { prevSucceeded string pods []indexPhase @@ -206,7 +208,7 @@ func TestCalculateSucceededIndexes(t *testing.T) { for _, p := range pods { p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer) } - gotStatusIntervals, gotIntervals := calculateSucceededIndexes(job, pods) + gotStatusIntervals, gotIntervals := calculateSucceededIndexes(logger, job, pods) if diff := cmp.Diff(tc.wantStatusIntervals, gotStatusIntervals); diff != "" { t.Errorf("Unexpected completed indexes from status (-want,+got):\n%s", diff) } diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 28c97ee4260d1..9c2d152e97481 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -132,12 +132,13 @@ type Controller struct { // NewController creates a new Job controller that keeps the relevant pods // in sync with their corresponding Job objects. -func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller { - return newControllerWithClock(podInformer, jobInformer, kubeClient, &clock.RealClock{}) +func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller { + return newControllerWithClock(ctx, podInformer, jobInformer, kubeClient, &clock.RealClock{}) } -func newControllerWithClock(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface, clock clock.WithTicker) *Controller { +func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface, clock clock.WithTicker) *Controller { eventBroadcaster := record.NewBroadcaster() + logger := klog.FromContext(ctx) jm := &Controller{ kubeClient: kubeClient, @@ -160,19 +161,27 @@ func newControllerWithClock(podInformer coreinformers.PodInformer, jobInformer b jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - jm.enqueueController(obj, true) + jm.enqueueController(logger, obj, true) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + jm.updateJob(logger, oldObj, newObj) + }, + DeleteFunc: func(obj interface{}) { + jm.deleteJob(logger, obj) }, - UpdateFunc: jm.updateJob, - DeleteFunc: jm.deleteJob, }) jm.jobLister = jobInformer.Lister() jm.jobStoreSynced = jobInformer.Informer().HasSynced podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: jm.addPod, - UpdateFunc: jm.updatePod, + AddFunc: func(obj interface{}) { + jm.addPod(logger, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + jm.updatePod(logger, oldObj, newObj) + }, DeleteFunc: func(obj interface{}) { - jm.deletePod(obj, true) + jm.deletePod(logger, obj, true) }, }) jm.podStore = podInformer.Lister() @@ -190,6 +199,7 @@ func newControllerWithClock(podInformer coreinformers.PodInformer, jobInformer b // Run the main goroutine responsible for watching and syncing jobs. func (jm *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() + logger := klog.FromContext(ctx) // Start events processing pipeline. jm.broadcaster.StartStructuredLogging(0) @@ -199,8 +209,8 @@ func (jm *Controller) Run(ctx context.Context, workers int) { defer jm.queue.ShutDown() defer jm.orphanQueue.ShutDown() - klog.Infof("Starting job controller") - defer klog.Infof("Shutting down job controller") + logger.Info("Starting job controller") + defer logger.Info("Shutting down job controller") if !cache.WaitForNamedCacheSync("job", ctx.Done(), jm.podStoreSynced, jm.jobStoreSynced) { return @@ -255,13 +265,13 @@ func (jm *Controller) resolveControllerRef(namespace string, controllerRef *meta } // When a pod is created, enqueue the controller that manages it and update its expectations. -func (jm *Controller) addPod(obj interface{}) { +func (jm *Controller) addPod(logger klog.Logger, obj interface{}) { pod := obj.(*v1.Pod) recordFinishedPodWithTrackingFinalizer(nil, pod) if pod.DeletionTimestamp != nil { // on a restart of the controller, it's possible a new pod shows up in a state that // is already pending deletion. Prevent the pod from being a creation observation. - jm.deletePod(pod, false) + jm.deletePod(logger, pod, false) return } @@ -276,7 +286,7 @@ func (jm *Controller) addPod(obj interface{}) { return } jm.expectations.CreationObserved(jobKey) - jm.enqueueControllerPodUpdate(job, true) + jm.enqueueControllerPodUpdate(logger, job, true) return } @@ -290,14 +300,14 @@ func (jm *Controller) addPod(obj interface{}) { // DO NOT observe creation because no controller should be waiting for an // orphan. for _, job := range jm.getPodJobs(pod) { - jm.enqueueControllerPodUpdate(job, true) + jm.enqueueControllerPodUpdate(logger, job, true) } } // When a pod is updated, figure out what job/s manage it and wake them up. // If the labels of the pod have changed we need to awaken both the old // and new job. old and cur must be *v1.Pod types. -func (jm *Controller) updatePod(old, cur interface{}) { +func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) { curPod := cur.(*v1.Pod) oldPod := old.(*v1.Pod) recordFinishedPodWithTrackingFinalizer(oldPod, curPod) @@ -311,7 +321,7 @@ func (jm *Controller) updatePod(old, cur interface{}) { // and after such time has passed, the kubelet actually deletes it from the store. We receive an update // for modification of the deletion timestamp and expect an job to create more pods asap, not wait // until the kubelet actually deletes the pod. - jm.deletePod(curPod, false) + jm.deletePod(logger, curPod, false) return } @@ -335,10 +345,10 @@ func (jm *Controller) updatePod(old, cur interface{}) { if finalizerRemoved { key, err := controller.KeyFunc(job) if err == nil { - jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID)) + jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID)) } } - jm.enqueueControllerPodUpdate(job, immediate) + jm.enqueueControllerPodUpdate(logger, job, immediate) } } @@ -351,10 +361,10 @@ func (jm *Controller) updatePod(old, cur interface{}) { if finalizerRemoved { key, err := controller.KeyFunc(job) if err == nil { - jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID)) + jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID)) } } - jm.enqueueControllerPodUpdate(job, immediate) + jm.enqueueControllerPodUpdate(logger, job, immediate) return } @@ -368,14 +378,14 @@ func (jm *Controller) updatePod(old, cur interface{}) { labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) if labelChanged || controllerRefChanged { for _, job := range jm.getPodJobs(curPod) { - jm.enqueueControllerPodUpdate(job, immediate) + jm.enqueueControllerPodUpdate(logger, job, immediate) } } } // When a pod is deleted, enqueue the job that manages the pod and update its expectations. // obj could be an *v1.Pod, or a DeleteFinalStateUnknown marker item. -func (jm *Controller) deletePod(obj interface{}, final bool) { +func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool) { pod, ok := obj.(*v1.Pod) if final { recordFinishedPodWithTrackingFinalizer(pod, nil) @@ -425,13 +435,13 @@ func (jm *Controller) deletePod(obj interface{}, final bool) { // Consider the finalizer removed if this is the final delete. Otherwise, // it's an update for the deletion timestamp, then check finalizer. if final || !hasFinalizer { - jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID)) + jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID)) } - jm.enqueueControllerPodUpdate(job, true) + jm.enqueueControllerPodUpdate(logger, job, true) } -func (jm *Controller) updateJob(old, cur interface{}) { +func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) { oldJob := old.(*batch.Job) curJob := cur.(*batch.Job) @@ -440,14 +450,16 @@ func (jm *Controller) updateJob(old, cur interface{}) { if err != nil { return } + if curJob.Generation == oldJob.Generation { // Delay the Job sync when no generation change to batch Job status updates, // typically triggered by pod events. - jm.enqueueControllerPodUpdate(curJob, true) + jm.enqueueControllerPodUpdate(logger, curJob, true) } else { // Trigger immediate sync when spec is changed. - jm.enqueueController(curJob, true) + jm.enqueueController(logger, curJob, true) } + // check if need to add a new rsync for ActiveDeadlineSeconds if curJob.Status.StartTime != nil { curADS := curJob.Spec.ActiveDeadlineSeconds @@ -460,15 +472,15 @@ func (jm *Controller) updateJob(old, cur interface{}) { total := time.Duration(*curADS) * time.Second // AddAfter will handle total < passed jm.queue.AddAfter(key, total-passed) - klog.V(4).Infof("job %q ActiveDeadlineSeconds updated, will rsync after %d seconds", key, total-passed) + logger.V(4).Info("job's ActiveDeadlineSeconds updated, will rsync", "key", key, "interval", total-passed) } } } // deleteJob enqueues the job and all the pods associated with it that still // have a finalizer. -func (jm *Controller) deleteJob(obj interface{}) { - jm.enqueueController(obj, true) +func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) { + jm.enqueueController(logger, obj, true) jobObj, ok := obj.(*batch.Job) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) @@ -499,15 +511,15 @@ func (jm *Controller) deleteJob(obj interface{}) { // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item, // immediate tells the controller to update the status right away, and should // happen ONLY when there was a successful pod run. -func (jm *Controller) enqueueController(obj interface{}, immediate bool) { - jm.enqueueControllerDelayed(obj, immediate, 0) +func (jm *Controller) enqueueController(logger klog.Logger, obj interface{}, immediate bool) { + jm.enqueueControllerDelayed(logger, obj, immediate, 0) } -func (jm *Controller) enqueueControllerPodUpdate(obj interface{}, immediate bool) { - jm.enqueueControllerDelayed(obj, immediate, jm.podUpdateBatchPeriod) +func (jm *Controller) enqueueControllerPodUpdate(logger klog.Logger, obj interface{}, immediate bool) { + jm.enqueueControllerDelayed(logger, obj, immediate, jm.podUpdateBatchPeriod) } -func (jm *Controller) enqueueControllerDelayed(obj interface{}, immediate bool, delay time.Duration) { +func (jm *Controller) enqueueControllerDelayed(logger klog.Logger, obj interface{}, immediate bool, delay time.Duration) { key, err := controller.KeyFunc(obj) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) @@ -527,7 +539,7 @@ func (jm *Controller) enqueueControllerDelayed(obj interface{}, immediate bool, // all controllers there will still be some replica instability. One way to handle this is // by querying the store for all controllers that this rc overlaps, as well as all // controllers that overlap this rc, and sorting them. - klog.Infof("enqueueing job %s", key) + logger.Info("enqueueing job", "key", key) jm.queue.AddAfter(key, backoff) } @@ -591,8 +603,9 @@ func (jm Controller) processNextOrphanPod(ctx context.Context) bool { // syncOrphanPod removes the tracking finalizer from an orphan pod if found. func (jm Controller) syncOrphanPod(ctx context.Context, key string) error { startTime := jm.clock.Now() + logger := klog.FromContext(ctx) defer func() { - klog.V(4).Infof("Finished syncing orphan pod %q (%v)", key, jm.clock.Since(startTime)) + logger.V(4).Info("Finished syncing orphan pod", "pod", key, "elapsed", jm.clock.Since(startTime)) }() ns, name, err := cache.SplitMetaNamespaceKey(key) @@ -603,7 +616,7 @@ func (jm Controller) syncOrphanPod(ctx context.Context, key string) error { sharedPod, err := jm.podStore.Pods(ns).Get(name) if err != nil { if apierrors.IsNotFound(err) { - klog.V(4).Infof("Orphan pod has been deleted: %v", key) + logger.V(4).Info("Orphan pod has been deleted", "pod", key) return nil } return err @@ -679,8 +692,9 @@ func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Po // concurrently with the same key. func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { startTime := jm.clock.Now() + logger := klog.FromContext(ctx) defer func() { - klog.V(4).Infof("Finished syncing job %q (%v)", key, jm.clock.Since(startTime)) + logger.V(4).Info("Finished syncing job", "key", key, "elapsed", jm.clock.Since(startTime)) }() ns, name, err := cache.SplitMetaNamespaceKey(key) @@ -693,9 +707,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { sharedJob, err := jm.jobLister.Jobs(ns).Get(name) if err != nil { if apierrors.IsNotFound(err) { - klog.V(4).Infof("Job has been deleted: %v", key) + logger.V(4).Info("Job has been deleted", "key", key) jm.expectations.DeleteExpectations(key) - jm.finalizerExpectations.deleteExpectations(key) + jm.finalizerExpectations.deleteExpectations(logger, key) err := jm.backoffRecordStore.removeBackoffRecord(key) if err != nil { @@ -798,14 +812,14 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now()) } else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) { syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time) - klog.V(2).InfoS("Job has activeDeadlineSeconds configuration. Will sync this job again", "job", key, "nextSyncIn", syncDuration) + logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration) jm.queue.AddAfter(key, syncDuration) } } var prevSucceededIndexes, succeededIndexes orderedIntervals if isIndexedJob(&job) { - prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(&job, pods) + prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(logger, &job, pods) succeeded = int32(succeededIndexes.total()) } suspendCondChanged := false @@ -883,7 +897,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if apierrors.IsConflict(err) { // we probably have a stale informer cache // so don't return an error to avoid backoff - jm.enqueueController(&job, false) + jm.enqueueController(logger, &job, false) return nil } return fmt.Errorf("tracking status: %w", err) @@ -927,12 +941,13 @@ func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) { errCh := make(chan error, len(pods)) successfulDeletes := int32(len(pods)) + logger := klog.FromContext(ctx) failDelete := func(pod *v1.Pod, err error) { // Decrement the expected number of deletes because the informer won't observe this deletion jm.expectations.DeletionObserved(jobKey) if !apierrors.IsNotFound(err) { - klog.V(2).Infof("Failed to delete Pod", "job", klog.KObj(job), "pod", klog.KObj(pod), "err", err) + logger.V(2).Info("Failed to delete Pod", "job", klog.KObj(job), "pod", klog.KObj(pod), "err", err) atomic.AddInt32(&successfulDeletes, -1) errCh <- err utilruntime.HandleError(err) @@ -969,6 +984,7 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey // It does this up to a limited number of Pods so that the size of .status // doesn't grow too much and this sync doesn't starve other Jobs. func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.Set[string], finishedCond *batch.JobCondition, needsFlush bool, newBackoffRecord backoffRecord) error { + logger := klog.FromContext(ctx) isIndexed := isIndexedJob(job) var podsToRemoveFinalizer []*v1.Pod uncountedStatus := job.Status.UncountedTerminatedPods @@ -1080,7 +1096,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job if jobFinished { jm.recordJobFinished(job, finishedCond) } - recordJobPodFinished(job, oldCounters) + recordJobPodFinished(logger, job, oldCounters) } return nil } @@ -1097,6 +1113,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool, newBackoffRecord backoffRecord) (*batch.Job, bool, error) { + logger := klog.FromContext(ctx) var err error if needsFlush { if job, err = jm.updateStatusHandler(ctx, job); err != nil { @@ -1109,10 +1126,10 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job // this error might undercount the backoff. // re-syncing from the current state might not help to recover // the backoff information - klog.ErrorS(err, "Backoff update failed") + logger.Error(err, "Backoff update failed") } - recordJobPodFinished(job, *oldCounters) + recordJobPodFinished(logger, job, *oldCounters) // Shallow copy, as it will only be used to detect changes in the counters. *oldCounters = job.Status needsFlush = false @@ -1173,6 +1190,7 @@ func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinali // of the i-th Pod was successfully removed (if the pod was deleted when this // function was called, it's considered as the finalizer was removed successfully). func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKey string, pods []*v1.Pod) ([]bool, error) { + logger := klog.FromContext(ctx) errCh := make(chan error, len(pods)) succeeded := make([]bool, len(pods)) uids := make([]string, len(pods)) @@ -1180,7 +1198,7 @@ func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKe uids[i] = string(p.UID) } if jobKey != "" { - err := jm.finalizerExpectations.expectFinalizersRemoved(jobKey, uids) + err := jm.finalizerExpectations.expectFinalizersRemoved(logger, jobKey, uids) if err != nil { return succeeded, fmt.Errorf("setting expected removed finalizers: %w", err) } @@ -1196,7 +1214,7 @@ func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKe // In case of any failure, we don't expect a Pod update for the // finalizer removed. Clear expectation now. if jobKey != "" { - jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID)) + jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID)) } if !apierrors.IsNotFound(err) { errCh <- err @@ -1359,6 +1377,7 @@ func jobSuspended(job *batch.Job) bool { // Respects back-off; does not create new pods if the back-off time has not passed // Does NOT modify . func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, backoff backoffRecord) (int32, string, error) { + logger := klog.FromContext(ctx) active := int32(len(activePods)) parallelism := *job.Spec.Parallelism jobKey, err := controller.KeyFunc(job) @@ -1368,7 +1387,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods } if jobSuspended(job) { - klog.V(4).InfoS("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active) + logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active) podsToDelete := activePodsForRemoval(job, activePods, int(active)) jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) @@ -1408,7 +1427,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods } if len(podsToDelete) > 0 { jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) - klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive) + logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed // While it is possible for a Job to require both pod creations and @@ -1421,7 +1440,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods if active < wantActive { remainingTime := backoff.getRemainingTime(jm.clock, DefaultJobBackOff, MaxJobBackOff) if remainingTime > 0 { - jm.enqueueControllerDelayed(job, true, remainingTime) + jm.enqueueControllerDelayed(logger, job, true, remainingTime) return 0, metrics.JobSyncActionPodsCreated, nil } diff := wantActive - active @@ -1431,7 +1450,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods jm.expectations.ExpectCreations(jobKey, int(diff)) errCh := make(chan error, diff) - klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff) + logger.V(4).Info("Too few pods running", "key", jobKey, "need", wantActive, "creating", diff) wait := sync.WaitGroup{} @@ -1486,7 +1505,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods if err != nil { defer utilruntime.HandleError(err) // Decrement the expected number of creates because the informer won't observe this pod - klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name) + logger.V(2).Info("Failed creation, decrementing expectations", "job", klog.KObj(job)) jm.expectations.CreationObserved(jobKey) atomic.AddInt32(&active, -1) errCh <- err @@ -1497,7 +1516,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods // any skipped pods that we never attempted to start shouldn't be expected. skippedPods := diff - batchSize if errorCount < len(errCh) && skippedPods > 0 { - klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name) + logger.V(2).Info("Slow-start failure. Skipping creating pods, decrementing expectations", "skippedCount", skippedPods, "job", klog.KObj(job)) active -= skippedPods for i := int32(0); i < skippedPods; i++ { // Decrement the expected number of creates because the informer won't observe this pod @@ -1715,7 +1734,7 @@ func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType return nil } -func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) { +func recordJobPodFinished(logger klog.Logger, job *batch.Job, oldCounters batch.JobStatus) { completionMode := completionModeStr(job) var diff int @@ -1726,7 +1745,7 @@ func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) { // now out of range (i.e. index >= spec.Completions). if isIndexedJob(job) { if job.Status.CompletedIndexes != oldCounters.CompletedIndexes { - diff = succeededIndexesFromString(job.Status.CompletedIndexes, int(*job.Spec.Completions)).total() - succeededIndexesFromString(oldCounters.CompletedIndexes, int(*job.Spec.Completions)).total() + diff = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)).total() - succeededIndexesFromString(logger, oldCounters.CompletedIndexes, int(*job.Spec.Completions)).total() } } else { diff = int(job.Status.Succeeded) - int(oldCounters.Succeeded) diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 0cf8e79abbf2a..16d0aae465e05 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -49,6 +49,8 @@ import ( "k8s.io/client-go/util/workqueue" featuregatetesting "k8s.io/component-base/featuregate/testing" metricstestutil "k8s.io/component-base/metrics/testutil" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/job/metrics" @@ -116,13 +118,13 @@ func newJob(parallelism, completions, backoffLimit int32, completionMode batch.C return newJobWithName("foobar", parallelism, completions, backoffLimit, completionMode) } -func newControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*Controller, informers.SharedInformerFactory) { - return newControllerFromClientWithClock(kubeClient, resyncPeriod, realClock) +func newControllerFromClient(ctx context.Context, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*Controller, informers.SharedInformerFactory) { + return newControllerFromClientWithClock(ctx, kubeClient, resyncPeriod, realClock) } -func newControllerFromClientWithClock(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, clock clock.WithTicker) (*Controller, informers.SharedInformerFactory) { +func newControllerFromClientWithClock(ctx context.Context, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, clock clock.WithTicker) (*Controller, informers.SharedInformerFactory) { sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod()) - jm := newControllerWithClock(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient, clock) + jm := newControllerWithClock(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient, clock) jm.podControl = &controller.FakePodControl{} return jm, sharedInformers } @@ -219,6 +221,7 @@ type jobInitialStatus struct { } func TestControllerSyncJob(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) jobConditionComplete := batch.JobComplete jobConditionFailed := batch.JobFailed jobConditionSuspended := batch.JobSuspended @@ -790,7 +793,7 @@ func TestControllerSyncJob(t *testing.T) { fakeClock = clocktesting.NewFakeClock(time.Now()) } - manager, sharedInformerFactory := newControllerFromClientWithClock(clientSet, controller.NoResyncPeriodFunc, fakeClock) + manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientSet, controller.NoResyncPeriodFunc, fakeClock) fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -1090,6 +1093,7 @@ func TestGetNewFinshedPods(t *testing.T) { } func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) succeededCond := newCondition(batch.JobComplete, v1.ConditionTrue, "", "", realClock.Now()) failedCond := newCondition(batch.JobFailed, v1.ConditionTrue, "", "", realClock.Now()) indexedCompletion := batch.IndexedCompletion @@ -1632,7 +1636,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, _ := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) + manager, _ := newControllerFromClient(ctx, clientSet, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{Err: tc.podControlErr} metrics.JobPodsFinished.Reset() manager.podControl = &fakePodControl @@ -1648,7 +1652,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) var succeededIndexes orderedIntervals if isIndexedJob(job) { - succeededIndexes = succeededIndexesFromString(job.Status.CompletedIndexes, int(*job.Spec.Completions)) + succeededIndexes = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) } err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush, backoffRecord{}) if !errors.Is(err, tc.wantErr) { @@ -1684,6 +1688,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { // TestSyncJobPastDeadline verifies tracking of active deadline in a single syncJob call. func TestSyncJobPastDeadline(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) testCases := map[string]struct { // job setup parallelism int32 @@ -1773,7 +1778,7 @@ func TestSyncJobPastDeadline(t *testing.T) { t.Run(name, func(t *testing.T) { // job manager setup clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientSet, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -1848,9 +1853,10 @@ func hasTrueCondition(job *batch.Job) *batch.JobConditionType { // TestPastDeadlineJobFinished ensures that a Job is correctly tracked until // reaching the active deadline, at which point it is marked as Failed. func TestPastDeadlineJobFinished(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) clientset := fake.NewSimpleClientset() fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second)) - manager, sharedInformerFactory := newControllerFromClientWithClock(clientset, controller.NoResyncPeriodFunc, fakeClock) + manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.expectations = FakeJobExpectations{ @@ -1927,8 +1933,9 @@ func TestPastDeadlineJobFinished(t *testing.T) { } func TestSingleJobFailedCondition(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -1966,8 +1973,9 @@ func TestSingleJobFailedCondition(t *testing.T) { } func TestSyncJobComplete(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -1991,8 +1999,9 @@ func TestSyncJobComplete(t *testing.T) { } func TestSyncJobDeleted(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, _ := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, _ := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -2014,6 +2023,7 @@ func TestSyncJobDeleted(t *testing.T) { } func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) now := metav1.Now() indexedCompletionMode := batch.IndexedCompletion validObjectMeta := metav1.ObjectMeta{ @@ -3038,7 +3048,7 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)() clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -3094,6 +3104,7 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { } func TestSyncJobUpdateRequeue(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) defer func() { DefaultJobBackOff = 10 * time.Second }() DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing @@ -3114,7 +3125,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { } for name, tc := range cases { t.Run(name, func(t *testing.T) { - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -3143,6 +3154,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { } func TestUpdateJobRequeue(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) cases := map[string]struct { oldJob *batch.Job @@ -3167,7 +3179,7 @@ func TestUpdateJobRequeue(t *testing.T) { } for name, tc := range cases { t.Run(name, func(t *testing.T) { - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady @@ -3176,7 +3188,7 @@ func TestUpdateJobRequeue(t *testing.T) { if tc.updateFn != nil { tc.updateFn(newJob) } - manager.updateJob(tc.oldJob, newJob) + manager.updateJob(logger, tc.oldJob, newJob) gotRequeuedImmediately := manager.queue.Len() > 0 if tc.wantRequeuedImmediately != gotRequeuedImmediately { t.Fatalf("Want immediate requeue: %v, got immediate requeue: %v", tc.wantRequeuedImmediately, gotRequeuedImmediately) @@ -3186,8 +3198,9 @@ func TestUpdateJobRequeue(t *testing.T) { } func TestJobPodLookup(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady testCases := []struct { @@ -3268,6 +3281,7 @@ func TestJobPodLookup(t *testing.T) { } func TestGetPodsForJob(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) job := newJob(1, 1, 6, batch.NonIndexedCompletion) job.Name = "test_job" otherJob := newJob(1, 1, 6, batch.NonIndexedCompletion) @@ -3330,7 +3344,7 @@ func TestGetPodsForJob(t *testing.T) { job.DeletionTimestamp = &metav1.Time{} } clientSet := fake.NewSimpleClientset(job, otherJob) - jm, informer := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) + jm, informer := newControllerFromClient(ctx, clientSet, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady cachedJob := job.DeepCopy() @@ -3368,8 +3382,11 @@ func TestGetPodsForJob(t *testing.T) { } func TestAddPod(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + logger := klog.FromContext(ctx) + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. @@ -3387,7 +3404,7 @@ func TestAddPod(t *testing.T) { informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) - jm.addPod(pod1) + jm.addPod(logger, pod1) if got, want := jm.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -3400,7 +3417,7 @@ func TestAddPod(t *testing.T) { t.Errorf("queue.Get() = %v, want %v", got, want) } - jm.addPod(pod2) + jm.addPod(logger, pod2) if got, want := jm.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -3415,8 +3432,9 @@ func TestAddPod(t *testing.T) { } func TestAddPodOrphan(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. @@ -3438,15 +3456,17 @@ func TestAddPodOrphan(t *testing.T) { pod1.OwnerReferences = nil informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) - jm.addPod(pod1) + jm.addPod(logger, pod1) if got, want := jm.queue.Len(), 2; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } } func TestUpdatePod(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. @@ -3466,7 +3486,7 @@ func TestUpdatePod(t *testing.T) { prev := *pod1 bumpResourceVersion(pod1) - jm.updatePod(&prev, pod1) + jm.updatePod(logger, &prev, pod1) if got, want := jm.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -3481,7 +3501,7 @@ func TestUpdatePod(t *testing.T) { prev = *pod2 bumpResourceVersion(pod2) - jm.updatePod(&prev, pod2) + jm.updatePod(logger, &prev, pod2) if got, want := jm.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -3496,8 +3516,9 @@ func TestUpdatePod(t *testing.T) { } func TestUpdatePodOrphanWithNewLabels(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. @@ -3518,15 +3539,17 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) { prev := *pod1 prev.Labels = map[string]string{"foo2": "bar2"} bumpResourceVersion(pod1) - jm.updatePod(&prev, pod1) + jm.updatePod(logger, &prev, pod1) if got, want := jm.queue.Len(), 2; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } } func TestUpdatePodChangeControllerRef(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. @@ -3546,15 +3569,17 @@ func TestUpdatePodChangeControllerRef(t *testing.T) { prev := *pod1 prev.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(job2, controllerKind)} bumpResourceVersion(pod1) - jm.updatePod(&prev, pod1) + jm.updatePod(logger, &prev, pod1) if got, want := jm.queue.Len(), 2; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } } func TestUpdatePodRelease(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. @@ -3574,15 +3599,16 @@ func TestUpdatePodRelease(t *testing.T) { prev := *pod1 pod1.OwnerReferences = nil bumpResourceVersion(pod1) - jm.updatePod(&prev, pod1) + jm.updatePod(logger, &prev, pod1) if got, want := jm.queue.Len(), 2; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } } func TestDeletePod(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. @@ -3600,7 +3626,7 @@ func TestDeletePod(t *testing.T) { informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) - jm.deletePod(pod1, true) + jm.deletePod(logger, pod1, true) if got, want := jm.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -3613,7 +3639,7 @@ func TestDeletePod(t *testing.T) { t.Errorf("queue.Get() = %v, want %v", got, want) } - jm.deletePod(pod2, true) + jm.deletePod(logger, pod2, true) if got, want := jm.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -3628,8 +3654,9 @@ func TestDeletePod(t *testing.T) { } func TestDeletePodOrphan(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady // Disable batching of pod updates. @@ -3650,7 +3677,7 @@ func TestDeletePodOrphan(t *testing.T) { pod1.OwnerReferences = nil informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) - jm.deletePod(pod1, true) + jm.deletePod(logger, pod1, true) if got, want := jm.queue.Len(), 0; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -3670,8 +3697,9 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool { // TestSyncJobExpectations tests that a pod cannot sneak in between counting active pods // and checking expectations. func TestSyncJobExpectations(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -3704,10 +3732,11 @@ func TestSyncJobExpectations(t *testing.T) { } func TestWatchJobs(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) clientset := fake.NewSimpleClientset() fakeWatch := watch.NewFake() clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil)) - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady @@ -3748,11 +3777,12 @@ func TestWatchJobs(t *testing.T) { } func TestWatchPods(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) testJob := newJob(2, 2, 6, batch.NonIndexedCompletion) clientset := fake.NewSimpleClientset(testJob) fakeWatch := watch.NewFake() clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady @@ -3795,9 +3825,10 @@ func TestWatchPods(t *testing.T) { } func TestWatchOrphanPods(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) clientset := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) - manager := NewController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) + manager := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady @@ -3876,6 +3907,7 @@ type pods struct { } func TestJobBackoffReset(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) testCases := map[string]struct { // job setup parallelism int32 @@ -3905,7 +3937,7 @@ func TestJobBackoffReset(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) defer func() { DefaultJobBackOff = 10 * time.Second }() DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -3966,6 +3998,8 @@ func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duratio } func TestJobBackoff(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + logger := klog.FromContext(ctx) job := newJob(1, 1, 1, batch.NonIndexedCompletion) oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) oldPod.ResourceVersion = "1" @@ -4038,7 +4072,7 @@ func TestJobBackoff(t *testing.T) { t.Run(name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)() clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -4053,7 +4087,7 @@ func TestJobBackoff(t *testing.T) { if tc.oldPodPhase != "" { oldPod.Status.Phase = tc.oldPodPhase } - manager.updatePod(oldPod, newPod) + manager.updatePod(logger, oldPod, newPod) if queue.duration != tc.wantBackoff { t.Errorf("unexpected backoff %v, expected %v", queue.duration, tc.wantBackoff) } @@ -4062,6 +4096,7 @@ func TestJobBackoff(t *testing.T) { } func TestJobBackoffForOnFailure(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) jobConditionComplete := batch.JobComplete jobConditionFailed := batch.JobFailed jobConditionSuspended := batch.JobSuspended @@ -4146,7 +4181,7 @@ func TestJobBackoffForOnFailure(t *testing.T) { t.Run(name, func(t *testing.T) { // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -4193,6 +4228,7 @@ func TestJobBackoffForOnFailure(t *testing.T) { } func TestJobBackoffOnRestartPolicyNever(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) jobConditionFailed := batch.JobFailed testCases := map[string]struct { @@ -4246,7 +4282,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { t.Run(name, func(t *testing.T) { // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -4378,9 +4414,10 @@ func TestEnsureJobConditions(t *testing.T) { } func TestFinalizersRemovedExpectations(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) clientset := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) - manager := NewController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) + manager := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.podControl = &controller.FakePodControl{Err: errors.New("fake pod controller error")} diff --git a/pkg/controller/job/tracking_utils.go b/pkg/controller/job/tracking_utils.go index 87d4425115a73..cca3bc2bb192f 100644 --- a/pkg/controller/job/tracking_utils.go +++ b/pkg/controller/job/tracking_utils.go @@ -74,8 +74,8 @@ func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set // ExpectDeletions records expectations for the given deleteKeys, against the // given job-key. // This is thread-safe across different job keys. -func (u *uidTrackingExpectations) expectFinalizersRemoved(jobKey string, deletedKeys []string) error { - klog.V(4).InfoS("Expecting tracking finalizers removed", "job", jobKey, "podUIDs", deletedKeys) +func (u *uidTrackingExpectations) expectFinalizersRemoved(logger klog.Logger, jobKey string, deletedKeys []string) error { + logger.V(4).Info("Expecting tracking finalizers removed", "key", jobKey, "podUIDs", deletedKeys) uids := u.getSet(jobKey) if uids == nil { @@ -94,12 +94,12 @@ func (u *uidTrackingExpectations) expectFinalizersRemoved(jobKey string, deleted } // FinalizerRemovalObserved records the given deleteKey as a deletion, for the given job. -func (u *uidTrackingExpectations) finalizerRemovalObserved(jobKey, deleteKey string) { +func (u *uidTrackingExpectations) finalizerRemovalObserved(logger klog.Logger, jobKey, deleteKey string) { uids := u.getSet(jobKey) if uids != nil { uids.Lock() if uids.set.Has(deleteKey) { - klog.V(4).InfoS("Observed tracking finalizer removed", "job", jobKey, "podUID", deleteKey) + logger.V(4).Info("Observed tracking finalizer removed", "key", jobKey, "podUID", deleteKey) uids.set.Delete(deleteKey) } uids.Unlock() @@ -107,11 +107,11 @@ func (u *uidTrackingExpectations) finalizerRemovalObserved(jobKey, deleteKey str } // DeleteExpectations deletes the UID set. -func (u *uidTrackingExpectations) deleteExpectations(jobKey string) { +func (u *uidTrackingExpectations) deleteExpectations(logger klog.Logger, jobKey string) { set := u.getSet(jobKey) if set != nil { if err := u.store.Delete(set); err != nil { - klog.ErrorS(err, "Could not delete tracking annotation UID expectations", "job", jobKey) + logger.Error(err, "Could not delete tracking annotation UID expectations", "key", jobKey) } } } diff --git a/pkg/controller/job/tracking_utils_test.go b/pkg/controller/job/tracking_utils_test.go index a64a0682a3a6b..9f380fe2b18d4 100644 --- a/pkg/controller/job/tracking_utils_test.go +++ b/pkg/controller/job/tracking_utils_test.go @@ -27,10 +27,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/component-base/metrics/testutil" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller/job/metrics" ) func TestUIDTrackingExpectations(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) tracks := []struct { job string firstRound []string @@ -62,7 +64,7 @@ func TestUIDTrackingExpectations(t *testing.T) { for i := range tracks { track := tracks[i] go func(errID int) { - errs[errID] = expectations.expectFinalizersRemoved(track.job, track.firstRound) + errs[errID] = expectations.expectFinalizersRemoved(logger, track.job, track.firstRound) wg.Done() }(i) } @@ -90,12 +92,12 @@ func TestUIDTrackingExpectations(t *testing.T) { for _, uid := range track.firstRound { uid := uid go func() { - expectations.finalizerRemovalObserved(track.job, uid) + expectations.finalizerRemovalObserved(logger, track.job, uid) wg.Done() }() } go func(errID int) { - errs[errID] = expectations.expectFinalizersRemoved(track.job, track.secondRound) + errs[errID] = expectations.expectFinalizersRemoved(logger, track.job, track.secondRound) wg.Done() }(i) } @@ -116,7 +118,7 @@ func TestUIDTrackingExpectations(t *testing.T) { } } for _, track := range tracks { - expectations.deleteExpectations(track.job) + expectations.deleteExpectations(logger, track.job) uids := expectations.getSet(track.job) if uids != nil { t.Errorf("Wanted expectations for job %s to be cleared, but they were not", track.job) diff --git a/test/integration/cronjob/cronjob_test.go b/test/integration/cronjob/cronjob_test.go index 6cdbf2a42f486..90f20902f07c1 100644 --- a/test/integration/cronjob/cronjob_test.go +++ b/test/integration/cronjob/cronjob_test.go @@ -52,7 +52,7 @@ func setup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc if err != nil { t.Fatalf("Error creating CronJob controller: %v", err) } - jc := job.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) + jc := job.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) return server.TearDownFn, cjc, jc, informerSet, clientSet } diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 2242be8593d9e..999e58b4ac7bc 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -2086,7 +2086,7 @@ func resetMetrics() { func createJobControllerWithSharedInformers(restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) { clientSet := clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-controller")) ctx, cancel := context.WithCancel(context.Background()) - jc := jobcontroller.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) + jc := jobcontroller.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) return jc, ctx, cancel }