Skip to content

Commit

Permalink
Add delay support for worker0 completed. (kubeflow#13)
Browse files Browse the repository at this point in the history
* add annotation support for worker0 completed

* modify annotation name

* modify annotation name

* modify check pod ttl way

* fix from comments

* fix from comments

* fix from comments

* modify ttl way

* update check ttl code
  • Loading branch information
AlanFokCo authored Dec 16, 2022
1 parent 986e8d5 commit a12e144
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 5 deletions.
15 changes: 12 additions & 3 deletions pkg/controller.v1/tensorflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package tensorflow

import (
"fmt"
"github.com/kubeflow/tf-operator/pkg/util"
"os"
"reflect"
"strings"
"time"

"github.com/kubeflow/tf-operator/pkg/util"

kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -70,6 +71,8 @@ const (

TFJobEvictAnnotation = "cluster-autoscaler.alibabacloud.com/evict-for-failed-pod"
PodEvictAnnotation = "cluster-autoscaler.kubernetes.io/safe-to-evict"

TFJobWaitingWorkerAnnotation = "arena.kubeflow.org/pod.ttlSecondsAfterFinished"
)

var (
Expand Down Expand Up @@ -424,8 +427,14 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1.TFJob) error {

// If the TFJob is terminated, delete all pods and services.
if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) || tfJobExceedsLimit {
if err := tc.deletePodsAndServices(tfjob, pods); err != nil {
return err

// If TTL is set, you need to wait until the TTL time before reclaiming resources.
ttlDuration, shouldWaitPodTTL := getPodTTL(tfjob)

if !shouldWaitPodTTL || isPodTTLReached(tfjob, ttlDuration) {
if err := tc.deletePodsAndServices(tfjob, pods); err != nil {
return err
}
}

if tfJobExceedsLimit {
Expand Down
28 changes: 28 additions & 0 deletions pkg/controller.v1/tensorflow/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -240,6 +241,33 @@ func (tc *TFController) createNewPod(tfjob *tfv1.TFJob, rt, index string, spec *
return nil
}

func getPodTTL(tfjob *tfv1.TFJob) (time.Duration, bool) {
logger := tflogger.LoggerForJob(tfjob)
if value, ok := tfjob.Annotations[TFJobWaitingWorkerAnnotation]; ok {
ttlDuration, err := time.ParseDuration(value)
if err != nil {
logger.Infof("parseDuration error: %s", err.Error())
return 0, false
}
if ttlDuration > 0 {
return ttlDuration, true
} else {
logger.Infof("the pod ttl annotation should be greater than 0")
return 0, false
}
} else {
logger.Infof("don't have annotation arena.kubeflow.org/pod.ttlSecondsAfterFinished")
return 0, false
}
}

func isPodTTLReached(tfjob *tfv1.TFJob, ttlDuration time.Duration) bool {
if time.Now().Sub(tfjob.Status.CompletionTime.Time) > ttlDuration {
return true
}
return false
}

func setClusterSpec(podTemplateSpec *v1.PodTemplateSpec, tfjob *tfv1.TFJob, rt, index string) error {
// Generate TF_CONFIG JSON string.
tfConfigStr, err := genTFConfigJSONStr(tfjob, rt, index)
Expand Down
12 changes: 10 additions & 2 deletions pkg/controller.v1beta2/tensorflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const (

TFJobEvictAnnotation = "cluster-autoscaler.alibabacloud.com/evict-for-failed-pod"
PodEvictAnnotation = "cluster-autoscaler.kubernetes.io/safe-to-evict"

TFJobWaitingWorkerAnnotation = "arena.kubeflow.org/pod.ttlSecondsAfterFinished"
)

var (
Expand Down Expand Up @@ -402,8 +404,14 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1beta2.TFJob) error {

// If the TFJob is terminated, delete all pods and services.
if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) || tfJobExceedsLimit {
if err := tc.deletePodsAndServices(tfjob, pods); err != nil {
return err

// If TTL is set, you need to wait until the TTL time before reclaiming resources.
ttlDuration, shouldWaitPodTTL := getPodTTL(tfjob)

if !shouldWaitPodTTL || isPodTTLReached(tfjob, ttlDuration) {
if err := tc.deletePodsAndServices(tfjob, pods); err != nil {
return err
}
}

if tfJobExceedsLimit {
Expand Down
28 changes: 28 additions & 0 deletions pkg/controller.v1beta2/tensorflow/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -234,6 +235,33 @@ func (tc *TFController) createNewPod(tfjob *tfv1beta2.TFJob, rt, index string, s
return nil
}

func getPodTTL(tfjob *tfv1beta2.TFJob) (time.Duration, bool) {
logger := tflogger.LoggerForJob(tfjob)
if value, ok := tfjob.Annotations[TFJobWaitingWorkerAnnotation]; ok {
ttlDuration, err := time.ParseDuration(value)
if err != nil {
logger.Infof("parseDuration error: %s", err.Error())
return 0, false
}
if ttlDuration > 0 {
return ttlDuration, true
} else {
logger.Infof("the pod ttl annotation should be greater than 0")
return 0, false
}
} else {
logger.Infof("don't have annotation arena.kubeflow.org/pod.ttlSecondsAfterFinished")
return 0, false
}
}

func isPodTTLReached(tfjob *tfv1beta2.TFJob, ttlDuration time.Duration) bool {
if time.Now().Sub(tfjob.Status.CompletionTime.Time) > ttlDuration {
return true
}
return false
}

func setClusterSpec(podTemplateSpec *v1.PodTemplateSpec, tfjob *tfv1beta2.TFJob, rt, index string) error {
// Generate TF_CONFIG JSON string.
tfConfigStr, err := genTFConfigJSONStr(tfjob, rt, index)
Expand Down

0 comments on commit a12e144

Please sign in to comment.