Skip to content

Commit

Permalink
feat: Support "reservedIpRanges" parameter in PipelineJob run() and s…
Browse files Browse the repository at this point in the history
…ubmit() methods.

PiperOrigin-RevId: 592024354
  • Loading branch information
vertex-sdk-bot authored and copybara-github committed Dec 18, 2023
1 parent 17dc9b7 commit ab99e00
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 0 deletions.
4 changes: 4 additions & 0 deletions google/cloud/aiplatform/pipeline_job_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ def __init__(
create_pipeline_job_request["pipeline_job"][
"encryption_spec"
] = pipeline_job._gca_resource.encryption_spec
if "reserved_ip_ranges" in pipeline_job._gca_resource:
create_pipeline_job_request["pipeline_job"][
"reserved_ip_ranges"
] = pipeline_job._gca_resource.reserved_ip_ranges
pipeline_job_schedule_args = {
"display_name": display_name,
"create_pipeline_job_request": create_pipeline_job_request,
Expand Down
20 changes: 20 additions & 0 deletions google/cloud/aiplatform/pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ def run(
self,
service_account: Optional[str] = None,
network: Optional[str] = None,
reserved_ip_ranges: Optional[List[str]] = None,
sync: Optional[bool] = True,
create_request_timeout: Optional[float] = None,
) -> None:
Expand All @@ -309,6 +310,9 @@ def run(
Private services access must already be configured for the network.
If left unspecified, the network set in aiplatform.init will be used.
Otherwise, the job is not peered with any network.
reserved_ip_ranges (List[str]):
Optional. A list of names for the reserved IP ranges under the VPC network that can be used for this PipelineJob's workload.
For example: ['vertex-ai-ip-range'].
sync (bool):
Optional. Whether to execute this method synchronously. If False, this method will unblock and it will be executed in a concurrent Future.
create_request_timeout (float):
Expand All @@ -319,6 +323,7 @@ def run(
self._run(
service_account=service_account,
network=network,
reserved_ip_ranges=reserved_ip_ranges,
sync=sync,
create_request_timeout=create_request_timeout,
)
Expand All @@ -328,6 +333,7 @@ def _run(
self,
service_account: Optional[str] = None,
network: Optional[str] = None,
reserved_ip_ranges: Optional[List[str]] = None,
sync: Optional[bool] = True,
create_request_timeout: Optional[float] = None,
) -> None:
Expand All @@ -342,6 +348,9 @@ def _run(
Optional. The full name of the Compute Engine network to which the job
should be peered. For example, projects/12345/global/networks/myVPC.
Private services access must already be configured for the network.
reserved_ip_ranges (List[str]):
Optional. A list of names for the reserved IP ranges under the VPC network that can be used for this PipelineJob's workload.
For example: ['vertex-ai-ip-range'].
sync (bool):
Optional. Whether to execute this method synchronously. If False, this method will unblock and it will be executed in a concurrent Future.
create_request_timeout (float):
Expand All @@ -350,6 +359,7 @@ def _run(
self.submit(
service_account=service_account,
network=network,
reserved_ip_ranges=reserved_ip_ranges,
create_request_timeout=create_request_timeout,
)

Expand All @@ -359,6 +369,7 @@ def submit(
self,
service_account: Optional[str] = None,
network: Optional[str] = None,
reserved_ip_ranges: Optional[List[str]] = None,
create_request_timeout: Optional[float] = None,
*,
experiment: Optional[Union[str, experiment_resources.Experiment]] = None,
Expand All @@ -376,6 +387,12 @@ def submit(
Private services access must already be configured for the network.
If left unspecified, the network set in aiplatform.init will be used.
Otherwise, the job is not peered with any network.
reserved_ip_ranges (List[str]):
Optional. A list of names for the reserved IP ranges under the VPC
network that can be used for this PipelineJob's workload. For example: ['vertex-ai-ip-range'].
If left unspecified, the job will be deployed to any IP ranges under
the provided VPC network.
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
experiment (Union[str, experiments_resource.Experiment]):
Expand All @@ -396,6 +413,9 @@ def submit(
if network:
self._gca_resource.network = network

if reserved_ip_ranges:
self._gca_resource.reserved_ip_ranges = reserved_ip_ranges

try:
output_artifacts_gcs_dir = (
self._gca_resource.runtime_config.gcs_output_directory
Expand Down
87 changes: 87 additions & 0 deletions tests/unit/aiplatform/test_pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
_TEST_HTTPS_TEMPLATE_PATH = "https://raw.githubusercontent.com/repo/pipeline.json"
_TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}"
_TEST_NETWORK = f"projects/{_TEST_PROJECT}/global/networks/{_TEST_PIPELINE_JOB_ID}"
_TEST_RESERVED_IP_RANGES = ["vertex-ai-ip-range"]

_TEST_PIPELINE_JOB_NAME = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/pipelineJobs/{_TEST_PIPELINE_JOB_ID}"
_TEST_PIPELINE_JOB_LIST_READ_MASK = field_mask.FieldMask(
Expand Down Expand Up @@ -231,6 +232,7 @@ def mock_pipeline_service_create():
create_time=_TEST_PIPELINE_CREATE_TIME,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
reserved_ip_ranges=_TEST_RESERVED_IP_RANGES,
)
yield mock_create_pipeline_job

Expand Down Expand Up @@ -267,6 +269,7 @@ def make_pipeline_job(state):
create_time=_TEST_PIPELINE_CREATE_TIME,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
reserved_ip_ranges=_TEST_RESERVED_IP_RANGES,
job_detail=gca_pipeline_job.PipelineJobDetail(
pipeline_run_context=gca_context.Context(
name=_TEST_PIPELINE_JOB_NAME,
Expand Down Expand Up @@ -548,6 +551,90 @@ def test_run_call_pipeline_service_create(
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED
)

@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
)
@pytest.mark.parametrize("sync", [True, False])
def test_run_call_pipeline_service_run_with_reserved_ip_ranges(
self,
mock_pipeline_service_create,
mock_pipeline_service_get,
mock_pipeline_bucket_exists,
job_spec,
mock_load_yaml_and_json,
sync,
):
import yaml

aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
)

job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
job_id=_TEST_PIPELINE_JOB_ID,
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS,
enable_caching=True,
)

job.run(
reserved_ip_ranges=_TEST_RESERVED_IP_RANGES,
sync=sync,
create_request_timeout=None,
)

if not sync:
job.wait()

expected_runtime_config_dict = {
"gcsOutputDirectory": _TEST_GCS_BUCKET_NAME,
"parameterValues": _TEST_PIPELINE_PARAMETER_VALUES,
"inputArtifacts": {"vertex_model": {"artifactId": "456"}},
}
runtime_config = gca_pipeline_job.PipelineJob.RuntimeConfig()._pb
json_format.ParseDict(expected_runtime_config_dict, runtime_config)

job_spec = yaml.safe_load(job_spec)
pipeline_spec = job_spec.get("pipelineSpec") or job_spec

# Construct expected request
expected_gapic_pipeline_job = gca_pipeline_job.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
pipeline_spec={
"components": {},
"pipelineInfo": pipeline_spec["pipelineInfo"],
"root": pipeline_spec["root"],
"schemaVersion": "2.1.0",
},
runtime_config=runtime_config,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
reserved_ip_ranges=_TEST_RESERVED_IP_RANGES,
)

mock_pipeline_service_create.assert_called_once_with(
parent=_TEST_PARENT,
pipeline_job=expected_gapic_pipeline_job,
pipeline_job_id=_TEST_PIPELINE_JOB_ID,
timeout=None,
)

mock_pipeline_service_get.assert_called_with(
name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY
)

assert job._gca_resource == make_pipeline_job(
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED
)

@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
Expand Down

0 comments on commit ab99e00

Please sign in to comment.