Skip to content

Commit

Permalink
feat: Enable Ray cluster creation and registering TensorFlow checkpoi…
Browse files Browse the repository at this point in the history
…nt to Vertex with Ray version 2.9

PiperOrigin-RevId: 611334973
  • Loading branch information
yinghsienwu authored and copybara-github committed Feb 29, 2024
1 parent d947304 commit ff148cd
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 62 deletions.
16 changes: 11 additions & 5 deletions google/cloud/aiplatform/preview/vertex_ray/cluster_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,13 @@ def create_ray_cluster(

local_ray_verion = _validation_utils.get_local_ray_version()
if ray_version != local_ray_verion:
install_ray_version = ".".join(ray_version.split("_"))
logging.info(
f"[Ray on Vertex]: Local runtime has Ray version {local_ray_verion}"
+ f", but the requested cluster runtime has {ray_version}. Please "
+ "ensure that the Ray versions match for client connectivity."
+ "ensure that the Ray versions match for client connectivity. You may "
+ f'"pip install --user --force-reinstall ray[default]=={install_ray_version}"'
+ " and restart runtime before cluster connection."
)

if cluster_name is None:
Expand Down Expand Up @@ -162,8 +165,12 @@ def create_ray_cluster(
ray_version, python_version, enable_cuda
)
if custom_images is not None:
if not (custom_images.head is None or custom_images.worker is None):
image_uri = custom_images.head
if custom_images.head is None or custom_images.worker is None:
raise ValueError(
"[Ray on Vertex AI]: custom_images.head and custom_images.worker must be specified when custom_images is set."
)
image_uri = custom_images.head

resource_pool_images[resource_pool_0.id] = image_uri

worker_pools = []
Expand Down Expand Up @@ -207,8 +214,7 @@ def create_ray_cluster(
ray_version, python_version, enable_cuda
)
if custom_images is not None:
if not (custom_images.head is None or custom_images.worker is None):
image_uri = custom_images.worker
image_uri = custom_images.worker
resource_pool_images[resource_pool.id] = image_uri

i += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import logging
import os
import pickle
import ray
import tempfile
from typing import Optional, TYPE_CHECKING

Expand Down Expand Up @@ -117,15 +118,23 @@ def _get_estimator_from(
Raises:
ValueError: Invalid Argument.
"""
if not isinstance(checkpoint, ray_sklearn.SklearnCheckpoint):
raise ValueError(
"[Ray on Vertex AI]: arg checkpoint should be a"
" ray.train.sklearn.SklearnCheckpoint instance"
)
if checkpoint.get_preprocessor() is not None:
logging.warning(
"Checkpoint contains preprocessor. However, converting from a Ray"
" Checkpoint to framework specific model does NOT support"
" preprocessing. The model will be exported without preprocessors."
)
return checkpoint.get_estimator()
ray_version = ray.__version__
if ray_version == "2.4.0":
if not isinstance(checkpoint, ray_sklearn.SklearnCheckpoint):
raise ValueError(
"[Ray on Vertex AI]: arg checkpoint should be a"
" ray.train.sklearn.SklearnCheckpoint instance"
)
if checkpoint.get_preprocessor() is not None:
logging.warning(
"Checkpoint contains preprocessor. However, converting from a Ray"
" Checkpoint to framework specific model does NOT support"
" preprocessing. The model will be exported without preprocessors."
)
return checkpoint.get_estimator()

# get_model() signature changed in future versions
try:
return checkpoint.get_estimator()
except AttributeError:
raise RuntimeError("Unsupported Ray version.")
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,29 @@ def _get_tensorflow_model_from(
Raises:
ValueError: Invalid Argument.
"""
if not isinstance(checkpoint, ray_tensorflow.TensorflowCheckpoint):
raise ValueError(
"[Ray on Vertex AI]: arg checkpoint should be a"
" ray.train.tensorflow.TensorflowCheckpoint instance"
)
if checkpoint.get_preprocessor() is not None:
logging.warning(
"Checkpoint contains preprocessor. However, converting from a Ray"
" Checkpoint to framework specific model does NOT support"
" preprocessing. The model will be exported without preprocessors."
)
if ray.__version__ == "2.4.0":
ray_version = ray.__version__
if ray_version == "2.4.0":
if not isinstance(checkpoint, ray_tensorflow.TensorflowCheckpoint):
raise ValueError(
"[Ray on Vertex AI]: arg checkpoint should be a"
" ray.train.tensorflow.TensorflowCheckpoint instance"
)
if checkpoint.get_preprocessor() is not None:
logging.warning(
"Checkpoint contains preprocessor. However, converting from a Ray"
" Checkpoint to framework specific model does NOT support"
" preprocessing. The model will be exported without preprocessors."
)

return checkpoint.get_model(model)

# get_model() signature changed in future versions
try:
return checkpoint.get_model()
except AttributeError:
raise RuntimeError("Unsupported Ray version.")
from tensorflow import keras

try:
return keras.models.load_model(checkpoint.path)
except OSError:
return keras.models.load_model("gs://" + checkpoint.path)
except ImportError:
logging.warning("TensorFlow must be installed to load the trained model.")
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import logging
from typing import Optional
import ray

try:
from ray.train import torch as ray_torch
Expand Down Expand Up @@ -51,15 +52,23 @@ def get_pytorch_model_from(
Raises:
ValueError: Invalid Argument.
"""
if not isinstance(checkpoint, ray_torch.TorchCheckpoint):
raise ValueError(
"[Ray on Vertex AI]: arg checkpoint should be a"
" ray.train.torch.TorchCheckpoint instance"
)
if checkpoint.get_preprocessor() is not None:
logging.warning(
"Checkpoint contains preprocessor. However, converting from a Ray"
" Checkpoint to framework specific model does NOT support"
" preprocessing. The model will be exported without preprocessors."
)
return checkpoint.get_model(model=model)
ray_version = ray.__version__
if ray_version == "2.4.0":
if not isinstance(checkpoint, ray_torch.TorchCheckpoint):
raise ValueError(
"[Ray on Vertex AI]: arg checkpoint should be a"
" ray.train.torch.TorchCheckpoint instance"
)
if checkpoint.get_preprocessor() is not None:
logging.warning(
"Checkpoint contains preprocessor. However, converting from a Ray"
" Checkpoint to framework specific model does NOT support"
" preprocessing. The model will be exported without preprocessors."
)
return checkpoint.get_model(model=model)

# get_model() signature changed in future versions
try:
return checkpoint.get_model()
except AttributeError:
raise RuntimeError("Unsupported Ray version.")
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import logging
import os
import pickle
import ray
import tempfile
from typing import Optional, TYPE_CHECKING

Expand Down Expand Up @@ -121,15 +122,23 @@ def _get_xgboost_model_from(
Raises:
ValueError: Invalid Argument.
"""
if not isinstance(checkpoint, ray_xgboost.XGBoostCheckpoint):
raise ValueError(
"[Ray on Vertex AI]: arg checkpoint should be a"
" ray.train.xgboost.XGBoostCheckpoint instance"
)
if checkpoint.get_preprocessor() is not None:
logging.warning(
"Checkpoint contains preprocessor. However, converting from a Ray"
" Checkpoint to framework specific model does NOT support"
" preprocessing. The model will be exported without preprocessors."
)
return checkpoint.get_model()
ray_version = ray.__version__
if ray_version == "2.4.0":
if not isinstance(checkpoint, ray_xgboost.XGBoostCheckpoint):
raise ValueError(
"[Ray on Vertex AI]: arg checkpoint should be a"
" ray.train.xgboost.XGBoostCheckpoint instance"
)
if checkpoint.get_preprocessor() is not None:
logging.warning(
"Checkpoint contains preprocessor. However, converting from a Ray"
" Checkpoint to framework specific model does NOT support"
" preprocessing. The model will be exported without preprocessors."
)
return checkpoint.get_model()

# get_model() signature changed in future versions
try:
return checkpoint.get_model()
except AttributeError:
raise RuntimeError("Unsupported Ray version.")
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,23 @@ def get_local_ray_version():

def get_image_uri(ray_version, python_version, enable_cuda):
"""Image uri for a given ray version and python version."""
if ray_version not in ["2_4"]:
raise ValueError("[Ray on Vertex AI]: The supported Ray version is 2_4.")
if ray_version not in ["2_4", "2_9"]:
raise ValueError(
"[Ray on Vertex AI]: The supported Ray versions are 2_4 (2.4.0) and 2_9 (2.9.3)."
)
if python_version not in ["3_10"]:
raise ValueError("[Ray on Vertex AI]: The supported Python version is 3_10.")

location = initializer.global_config.location
region = location.split("-")[0]
if region not in _AVAILABLE_REGIONS:
region = _DEFAULT_REGION

ray_version = ray_version.replace("_", "-")
if enable_cuda:
# TODO(b/292003337) update eligible image uris
return f"{region}-docker.pkg.dev/vertex-ai/training/ray-gpu.2-4.py310:latest"
return f"{region}-docker.pkg.dev/vertex-ai/training/ray-gpu.{ray_version}.py310:latest"
else:
return f"{region}-docker.pkg.dev/vertex-ai/training/ray-cpu.2-4.py310:latest"
return f"{region}-docker.pkg.dev/vertex-ai/training/ray-cpu.{ray_version}.py310:latest"


def get_versions_from_image_uri(image_uri):
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/vertex_ray/test_cluster_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def test_create_ray_cluster_ray_version_error(self):
network=tc.ProjectConstants._TEST_VPC_NETWORK,
ray_version="2_1",
)
e.match(regexp=r"The supported Ray version is 2_4.")
e.match(regexp=r"The supported Ray versions are 2_4 ")

@pytest.mark.usefixtures("create_persistent_resource_exception_mock")
def test_create_ray_cluster_state_error(self):
Expand Down

0 comments on commit ff148cd

Please sign in to comment.