Skip to content

Commit

Permalink
feat: Add creationObserved when creating pods has errors (#1236)
Browse files Browse the repository at this point in the history
Signed-off-by: cegao <cegao@tencent.com>
  • Loading branch information
gaocegege authored Feb 4, 2021
1 parent c13a9e4 commit 4f69069
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pkg/controller.v1/tensorflow/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,11 @@ func (tc *TFController) createNewPod(tfjob *tfv1.TFJob, rt, index string, spec *
// pod when the expectation expires.
return nil
} else if err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
logger.Infof(
"Failed creation, decrementing expectations for tfjob %s/%s, key %s",
tfjob.Namespace, tfjob.Name, expectationPodsKey)
tc.Expectations.CreationObserved(expectationPodsKey)
return err
}
return nil
Expand Down
123 changes: 123 additions & 0 deletions pkg/controller.v1/tensorflow/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package tensorflow

import (
"fmt"
"os"
"reflect"
"testing"
Expand All @@ -30,6 +31,7 @@ import (
commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/common"
"github.com/kubeflow/common/pkg/controller.v1/control"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
"github.com/kubeflow/tf-operator/cmd/tf-operator.v1/app/options"
tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1"
tfjobclientset "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -104,6 +106,127 @@ func TestAddPod(t *testing.T) {
close(stopCh)
}

func TestExpectation(t *testing.T) {
// Prepare the clientset and controller for the test.
kubeClientSet := kubeclientset.NewForConfigOrDie(&rest.Config{
Host: "",
ContentConfig: rest.ContentConfig{
GroupVersion: &v1.SchemeGroupVersion,
},
},
)

// Prepare the volcano clientset and controller for the test.
volcanoClientSet := volcanoclient.NewForConfigOrDie(&rest.Config{
Host: "",
ContentConfig: rest.ContentConfig{
GroupVersion: &batchv1beta1.SchemeGroupVersion,
},
},
)

config := &rest.Config{
Host: "",
ContentConfig: rest.ContentConfig{
GroupVersion: &tfv1.SchemeGroupVersion,
},
}
tfJobClientSet := tfjobclientset.NewForConfigOrDie(config)
ctr, _, _ := newTFController(config, kubeClientSet,
volcanoClientSet, tfJobClientSet, 0, options.ServerOption{})
ctr.tfJobInformerSynced = testutil.AlwaysReady
ctr.PodInformerSynced = testutil.AlwaysReady
ctr.ServiceInformerSynced = testutil.AlwaysReady

ctr.PodControl = &control.FakePodControl{}
tfJob := testutil.NewTFJob(2, 1)

var err error
if err = ctr.createNewPod(tfJob, "worker", "0",
tfJob.Spec.TFReplicaSpecs[tfv1.TFReplicaTypeWorker],
false, tfJob.Spec.TFReplicaSpecs); err != nil {
t.Errorf("Expected get nil, got error %v", err)
}

tfjobKey, err := KeyFunc(tfJob)
if err != nil {
t.Errorf("Expected nil, got error %v", err)
}
expectationPodsKey := expectation.GenExpectationPodsKey(tfjobKey, "worker")
e, found, err := ctr.Expectations.GetExpectations(expectationPodsKey)
if err != nil {
t.Errorf("Expected nil, got error %v", err)
}
if !found {
t.Errorf("Expected to get the corresponding expectation")
}
if add, del := e.GetExpectations(); add != 1 || del != 0 {
t.Errorf("Expected get 1 add and 0 del, got %d add and %d del", add, del)
}
}

func TestExpectationWithError(t *testing.T) {
// Prepare the clientset and controller for the test.
kubeClientSet := kubeclientset.NewForConfigOrDie(&rest.Config{
Host: "",
ContentConfig: rest.ContentConfig{
GroupVersion: &v1.SchemeGroupVersion,
},
},
)

// Prepare the volcano clientset and controller for the test.
volcanoClientSet := volcanoclient.NewForConfigOrDie(&rest.Config{
Host: "",
ContentConfig: rest.ContentConfig{
GroupVersion: &batchv1beta1.SchemeGroupVersion,
},
},
)

config := &rest.Config{
Host: "",
ContentConfig: rest.ContentConfig{
GroupVersion: &tfv1.SchemeGroupVersion,
},
}
tfJobClientSet := tfjobclientset.NewForConfigOrDie(config)
ctr, _, _ := newTFController(config, kubeClientSet,
volcanoClientSet, tfJobClientSet, 0, options.ServerOption{})
ctr.tfJobInformerSynced = testutil.AlwaysReady
ctr.PodInformerSynced = testutil.AlwaysReady
ctr.ServiceInformerSynced = testutil.AlwaysReady

ctr.PodControl = &control.FakePodControl{}
tfJob := testutil.NewTFJob(2, 1)

// Fake an error.
ctr.PodControl.(*control.FakePodControl).Err = fmt.Errorf("Fake")

var err error
if err = ctr.createNewPod(tfJob, "worker", "0",
tfJob.Spec.TFReplicaSpecs[tfv1.TFReplicaTypeWorker],
false, tfJob.Spec.TFReplicaSpecs); err == nil {
t.Errorf("Expected error, got nil")
}

tfjobKey, err := KeyFunc(tfJob)
if err != nil {
t.Errorf("Expected nil, got error %v", err)
}
expectationPodsKey := expectation.GenExpectationPodsKey(tfjobKey, "worker")
e, found, err := ctr.Expectations.GetExpectations(expectationPodsKey)
if err != nil {
t.Errorf("Expected nil, got error %v", err)
}
if !found {
t.Errorf("Expected to get the corresponding expectation")
}
if add, del := e.GetExpectations(); add != 0 || del != 0 {
t.Errorf("Expected get 0 add and 0 del, got %d add and %d del", add, del)
}
}

func TestClusterSpec(t *testing.T) {
type tc struct {
tfJob *tfv1.TFJob
Expand Down

0 comments on commit 4f69069

Please sign in to comment.