Skip to content

Commit

Permalink
feat: Add rerun method to pipeline job preview client.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 678990608
  • Loading branch information
chenyifan-vertex authored and copybara-github committed Sep 26, 2024
1 parent 44766a0 commit 29dec74
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 0 deletions.
113 changes: 113 additions & 0 deletions google/cloud/aiplatform/preview/pipelinejob/pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,3 +485,116 @@ def submit(
)

_LOGGER.info("View Pipeline Job:\n%s" % self._dashboard_uri())

def rerun(
self,
original_pipelinejob_name: str,
pipeline_task_rerun_configs: Optional[
List[aiplatform_v1beta1.PipelineTaskRerunConfig]
] = None,
parameter_values: Optional[Dict[str, Any]] = None,
job_id: Optional[str] = None,
service_account: Optional[str] = None,
network: Optional[str] = None,
reserved_ip_ranges: Optional[List[str]] = None,
) -> None:
"""Rerun a PipelineJob.
Args:
original_pipelinejob_name (str):
Required. The name of the original PipelineJob.
pipeline_task_rerun_configs (List[aiplatform_v1beta1.PipelineTaskRerunConfig]):
Optional. The list of PipelineTaskRerunConfig to specify the tasks to rerun.
parameter_values (Dict[str, Any]):
Optional. The parameter values to override the original PipelineJob.
job_id (str):
Optional. The ID to use for the PipelineJob, which will become the final
component of the PipelineJob name. If not provided, an ID will be
automatically generated.
service_account (str):
Optional. Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
network (str):
Optional. The full name of the Compute Engine network to which the job
should be peered. For example, projects/12345/global/networks/myVPC.
Private services access must already be configured for the network.
If left unspecified, the network set in aiplatform.init will be used.
Otherwise, the job is not peered with any network.
reserved_ip_ranges (List[str]):
Optional. A list of names for the reserved IP ranges under the VPC
network that can be used for this PipelineJob's workload. For example: ['vertex-ai-ip-range'].
If left unspecified, the job will be deployed to any IP ranges under
the provided VPC network.
"""
network = network or initializer.global_config.network
service_account = service_account or initializer.global_config.service_account
gca_resouce = self._v1_beta1_pipeline_job

if service_account:
gca_resouce.service_account = service_account

if network:
gca_resouce.network = network

if reserved_ip_ranges:
gca_resouce.reserved_ip_ranges = reserved_ip_ranges
user_project = initializer.global_config.project
user_location = initializer.global_config.location
parent = initializer.global_config.common_location_path(
project=user_project, location=user_location
)

client = self._instantiate_client(
location=user_location,
appended_user_agent=["preview-pipeline-job-submit"],
)
v1beta1_client = client.select_version(compat.V1BETA1)

_LOGGER.log_create_with_lro(self.__class__)

pipeline_job = self._v1_beta1_pipeline_job
try:
get_request = aiplatform_v1beta1.GetPipelineJobRequest(
name=original_pipelinejob_name
)
original_pipeline_job = v1beta1_client.get_pipeline_job(request=get_request)
pipeline_job.original_pipeline_job_id = int(
original_pipeline_job.labels["vertex-ai-pipelines-run-billing-id"]
)
except Exception as e:
raise ValueError(
f"Failed to get original pipeline job: {original_pipelinejob_name}"
) from e

pipeline_job.pipeline_task_rerun_configs = pipeline_task_rerun_configs

if parameter_values:
runtime_config = self._v1_beta1_pipeline_job.runtime_config
runtime_config.parameter_values = parameter_values

pipeline_name = self._v1_beta1_pipeline_job.display_name

job_id = job_id or "{pipeline_name}-{timestamp}".format(
pipeline_name=re.sub("[^-0-9a-z]+", "-", pipeline_name.lower())
.lstrip("-")
.rstrip("-"),
timestamp=_get_current_time().strftime("%Y%m%d%H%M%S"),
)

request = aiplatform_v1beta1.CreatePipelineJobRequest(
parent=parent,
pipeline_job=self._v1_beta1_pipeline_job,
pipeline_job_id=job_id,
)

response = v1beta1_client.create_pipeline_job(request=request)

self._gca_resource = response

_LOGGER.log_create_complete_with_getter(
self.__class__, self._gca_resource, "pipeline_job"
)

_LOGGER.info("View Pipeline Job:\n%s" % self._dashboard_uri())
93 changes: 93 additions & 0 deletions tests/unit/aiplatform/test_pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from google.api_core import operation as ga_operation
from google.auth import credentials as auth_credentials
from google.cloud import aiplatform
from google.cloud import aiplatform_v1beta1
from google.cloud.aiplatform import base
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform.constants import pipeline as pipeline_constants
Expand Down Expand Up @@ -77,6 +78,7 @@
_TEST_GCS_OUTPUT_DIRECTORY = f"gs://{_TEST_GCS_BUCKET_NAME}/output_artifacts/"
_TEST_CREDENTIALS = auth_credentials.AnonymousCredentials()
_TEST_SERVICE_ACCOUNT = "abcde@my-project.iam.gserviceaccount.com"
_TEST_LABELS = {"vertex-ai-pipelines-run-billing-id": "100"}

_TEST_TEMPLATE_PATH = f"gs://{_TEST_GCS_BUCKET_NAME}/job_spec.json"
_TEST_AR_TEMPLATE_PATH = "https://us-central1-kfp.pkg.dev/proj/repo/pack/latest"
Expand Down Expand Up @@ -290,6 +292,53 @@ def mock_pipeline_v1beta1_service_create():
yield mock_create_pipeline_job


@pytest.fixture
def mock_pipeline_v1beta1_service_get():
with mock.patch.object(
v1beta1_pipeline_service.PipelineServiceClient, "get_pipeline_job"
) as mock_get_pipeline_job:
mock_get_pipeline_job.side_effect = [
make_v1beta1_pipeline_job(
_TEST_PIPELINE_JOB_NAME,
gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING,
),
make_v1beta1_pipeline_job(
_TEST_PIPELINE_JOB_NAME,
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
),
make_v1beta1_pipeline_job(
_TEST_PIPELINE_JOB_NAME,
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
),
make_v1beta1_pipeline_job(
_TEST_PIPELINE_JOB_NAME,
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
),
make_v1beta1_pipeline_job(
_TEST_PIPELINE_JOB_NAME,
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
),
make_v1beta1_pipeline_job(
_TEST_PIPELINE_JOB_NAME,
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
),
make_v1beta1_pipeline_job(
_TEST_PIPELINE_JOB_NAME,
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
),
make_v1beta1_pipeline_job(
_TEST_PIPELINE_JOB_NAME,
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
),
make_v1beta1_pipeline_job(
_TEST_PIPELINE_JOB_NAME,
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
),
]

yield mock_get_pipeline_job


@pytest.fixture
def mock_pipeline_v1_service_batch_cancel():
with patch.object(
Expand Down Expand Up @@ -351,6 +400,7 @@ def make_v1beta1_pipeline_job(name: str, state: v1beta1_pipeline_state.PipelineS
create_time=_TEST_PIPELINE_CREATE_TIME,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
labels=_TEST_LABELS,
job_detail=v1beta1_pipeline_job.PipelineJobDetail(
pipeline_run_context=v1beta1_context.Context(
name=name,
Expand Down Expand Up @@ -2284,6 +2334,49 @@ def test_submit_v1beta1_pipeline_job_returns_response(

assert mock_pipeline_v1beta1_service_create.call_count == 1

@pytest.mark.usefixtures(
"mock_pipeline_v1beta1_service_create",
"mock_pipeline_v1beta1_service_get",
)
@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
)
def test_rerun_v1beta1_pipeline_job_returns_response(
self,
mock_load_yaml_and_json,
job_spec,
mock_pipeline_v1beta1_service_create,
mock_pipeline_v1beta1_service_get,
):
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
credentials=_TEST_CREDENTIALS,
)

job = preview_pipeline_jobs._PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
job_id=_TEST_PIPELINE_JOB_ID,
)

job.submit()

job.rerun(
original_pipelinejob_name=_TEST_PIPELINE_JOB_NAME,
pipeline_task_rerun_configs=[
aiplatform_v1beta1.PipelineTaskRerunConfig(
task_name="task-name",
task_id=100,
)
],
parameter_values={"param-1": "value-1"},
)

assert mock_pipeline_v1beta1_service_get.call_count == 1
assert mock_pipeline_v1beta1_service_create.call_count == 2

@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
Expand Down

0 comments on commit 29dec74

Please sign in to comment.