Skip to content

Commit

Permalink
fix: requeue when expire time is not up yet
Browse files Browse the repository at this point in the history
Signed-off-by: Garrybest <garrybest@foxmail.com>
  • Loading branch information
Garrybest committed Jun 14, 2022
1 parent bf3787f commit fbd3d06
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 0 deletions.
27 changes: 27 additions & 0 deletions pkg/common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package util

import (
"fmt"
"time"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
commonutil "github.com/kubeflow/common/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -72,3 +76,26 @@ func GetReplicaTypes(specs map[commonv1.ReplicaType]*commonv1.ReplicaSpec) []com
}
return keys
}

// DurationUntilExpireTime returns the duration until job needs to be cleaned up, or -1 if it's infinite.
func DurationUntilExpireTime(runPolicy *commonv1.RunPolicy, jobStatus commonv1.JobStatus) (time.Duration, error) {
if !commonutil.IsSucceeded(jobStatus) && !commonutil.IsFailed(jobStatus) {
return -1, nil
}
currentTime := time.Now()
ttl := runPolicy.TTLSecondsAfterFinished
if ttl == nil {
return -1, nil
}
duration := time.Second * time.Duration(*ttl)
if jobStatus.CompletionTime == nil {
return -1, fmt.Errorf("job completion time is nil, cannot cleanup")
}
finishTime := jobStatus.CompletionTime
expireTime := finishTime.Add(duration)
if currentTime.After(expireTime) {
return 0, nil
} else {
return expireTime.Sub(currentTime), nil
}
}
9 changes: 9 additions & 0 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,15 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, err
}

t, err := util.DurationUntilExpireTime(&mpijob.Spec.RunPolicy, mpijob.Status)
if err != nil {
logrus.Warnf("Reconcile Tensorflow Job error %v", err)
return ctrl.Result{}, err
}
if t >= 0 {
return ctrl.Result{RequeueAfter: t}, nil
}

return ctrl.Result{}, nil
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ func (r *MXJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
return ctrl.Result{}, err
}

t, err := util.DurationUntilExpireTime(&mxjob.Spec.RunPolicy, mxjob.Status)
if err != nil {
logrus.Warnf("Reconcile Tensorflow Job error %v", err)
return ctrl.Result{}, err
}
if t >= 0 {
return ctrl.Result{RequeueAfter: t}, nil
}

return ctrl.Result{}, nil
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/controller.v1/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,15 @@ func (r *PyTorchJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

t, err := util.DurationUntilExpireTime(&pytorchjob.Spec.RunPolicy, pytorchjob.Status)
if err != nil {
logrus.Warnf("Reconcile Tensorflow Job error %v", err)
return ctrl.Result{}, err
}
if t >= 0 {
return ctrl.Result{RequeueAfter: t}, nil
}

return ctrl.Result{}, nil
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/controller.v1/tensorflow/tfjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ func (r *TFJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
return ctrl.Result{}, err
}

t, err := util.DurationUntilExpireTime(&tfjob.Spec.RunPolicy, tfjob.Status)
if err != nil {
logrus.Warnf("Reconcile Tensorflow Job error %v", err)
return ctrl.Result{}, err
}
if t >= 0 {
return ctrl.Result{RequeueAfter: t}, nil
}

return ctrl.Result{}, nil
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/controller.v1/xgboost/xgboostjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,15 @@ func (r *XGBoostJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

t, err := util.DurationUntilExpireTime(&xgboostjob.Spec.RunPolicy, xgboostjob.Status)
if err != nil {
logrus.Warnf("Reconcile Tensorflow Job error %v", err)
return ctrl.Result{}, err
}
if t >= 0 {
return ctrl.Result{RequeueAfter: t}, nil
}

return reconcile.Result{}, nil
}

Expand Down

0 comments on commit fbd3d06

Please sign in to comment.