Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][compiled graphs] Throw unrecoverable actor exceptions at ray.get() #49461

Merged
merged 9 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def do_profile_tasks(
for operation in schedule:
start_t = time.perf_counter()
task = tasks[operation.exec_task_idx]
done = tasks[operation.exec_task_idx].exec_operation(
done = task.exec_operation(
self, operation.type, overlap_gpu_communication
)
end_t = time.perf_counter()
Expand Down Expand Up @@ -1898,7 +1898,7 @@ def teardown(self, kill_actors: bool = False):
for cancel_ref in cancel_refs:
try:
ray.get(cancel_ref, timeout=30)
except ray.exceptions.RayChannelError:
except RayChannelError:
# Channel error happens when a channel is closed
# or timed out. In this case, do not log.
pass
Expand Down
6 changes: 4 additions & 2 deletions python/ray/dag/tests/experimental/test_accelerated_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


from ray._private.test_utils import run_string_as_driver
from ray.exceptions import RayChannelError, RayChannelTimeoutError
from ray.exceptions import ActorDiedError, RayChannelError, RayChannelTimeoutError
import ray
import ray._private
import ray.cluster_utils
Expand Down Expand Up @@ -1530,7 +1530,9 @@ def test_dag_fault_tolerance_sys_exit(ray_start_regular, single_fetch):
else:
assert ray.get(refs) == [i + 1] * len(actors)

with pytest.raises(RayChannelError, match="Channel closed."):
with pytest.raises(
ActorDiedError, match="The actor died unexpectedly before finishing this task."
):
for i in range(99):
refs = compiled_dag.execute(1)
if single_fetch:
Expand Down
8 changes: 4 additions & 4 deletions python/ray/dag/tests/experimental/test_mocked_nccl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import ray
import ray.cluster_utils
from ray.exceptions import RayChannelError
from ray.exceptions import RayChannelError, RayTaskError
from ray.experimental.channel.torch_tensor_type import TorchTensorType
from ray.experimental.channel.conftest import (
Barrier,
Expand Down Expand Up @@ -197,7 +197,7 @@ def test_p2p_static_shape_error(capsys, ray_start_cluster, send_as_dict):

# Sending wrong shape errors.
ref = compiled_dag.execute(i, shape=(20,), dtype=dtype)
with pytest.raises(RayChannelError):
with pytest.raises(RayTaskError):
ray.get(ref)

# Sending correct shape still errors because the DAG has already been torn
Expand Down Expand Up @@ -298,7 +298,7 @@ def test_p2p_direct_return_error(capsys, ray_start_cluster):

# Error is thrown if we do not send a tensor.
ref = compiled_dag.execute(shape=shape, dtype=dtype, value=1, send_as_dict=True)
with pytest.raises(RayChannelError):
with pytest.raises(RayTaskError):
ray.get(ref)

# Currently the receiver cannot catch the exception so the DAG cannot be
Expand Down Expand Up @@ -373,7 +373,7 @@ def test_p2p_static_shape_and_direct_return(
# Error is thrown if we do not send a tensor.
ref = compiled_dag.execute(shape=shape, dtype=dtype, value=1, send_as_dict=True)

with pytest.raises(RayChannelError):
with pytest.raises(RayTaskError):
ray.get(ref)

# Currently the receiver cannot catch either kind of
Expand Down
63 changes: 56 additions & 7 deletions python/ray/dag/tests/experimental/test_torch_tensor_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import time
from ray.air._internal import torch_utils
from ray.dag import InputNode
from ray.exceptions import RayChannelError
from ray.exceptions import RayChannelError, RayTaskError
from ray.dag.output_node import MultiOutputNode
from ray.experimental.channel.communicator import (
Communicator,
Expand Down Expand Up @@ -64,6 +64,9 @@ def send_or_raise(self, shape, dtype, value: int, raise_exception=False):
raise RuntimeError()
return torch.ones(shape, dtype=dtype, device=self.device) * value

def send_int(self, value: int):
return value

def recv(self, tensor):
# Check that tensor got loaded to the correct device.
assert tensor.device == self.device
Expand Down Expand Up @@ -770,7 +773,7 @@ def test_torch_tensor_nccl_static_shape(ray_start_regular):

# Error is thrown if we send the wrong shape. Currently the receiver cannot
# catch the exception so the DAG cannot be used again.
with pytest.raises(RayChannelError):
with pytest.raises(RayTaskError):
ref = compiled_dag.execute(i, shape=(20,), dtype=dtype)
ray.get(ref)

Expand Down Expand Up @@ -808,7 +811,7 @@ def test_torch_tensor_nccl_direct_return(ray_start_regular):
# Error is thrown if we do not send a tensor. Currently the receiver cannot
# catch the exception so the DAG cannot be used again.
ref = compiled_dag.execute(value=i, shape=shape, dtype=dtype, send_tensor=False)
with pytest.raises(RayChannelError):
with pytest.raises(RayTaskError):
ray.get(ref)

with pytest.raises(RayChannelError):
Expand Down Expand Up @@ -857,7 +860,7 @@ def test_torch_tensor_exceptions(
ray_start_regular, static_shape, direct_return, overlap_gpu_communication
):
"""
Test exceptions being thrown by a NCCL sending task.
Test exceptions being thrown by a NCCL sending task's execution.
"""
if not USE_GPU:
pytest.skip("NCCL tests require GPUs")
Expand Down Expand Up @@ -910,10 +913,9 @@ def test_torch_tensor_exceptions(
value=i,
raise_exception=True,
)

if static_shape or direct_return:
with pytest.raises(RayChannelError):
# TODO(swang): Ideally return the RuntimeError thrown by the
# application instead of a generic RayChannelError.
with pytest.raises(RuntimeError):
ray.get(ref)

with pytest.raises(RayChannelError):
Expand All @@ -940,6 +942,53 @@ def test_torch_tensor_exceptions(
assert result == (i, shape, dtype)


@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_torch_tensor_exceptions2(
ray_start_regular,
):
"""
Test exceptions being thrown by a NCCL sending task's write operation.
"""
if not USE_GPU:
pytest.skip("NCCL tests require GPUs")

assert (
sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) > 1
), "This test requires at least 2 GPUs"

actor_cls = TorchTensorWorker.options(num_gpus=1)
sender = actor_cls.remote()
receiver = actor_cls.remote()

with InputNode() as inp:
dag = sender.send_int.bind(inp)
dag = dag.with_type_hint(
TorchTensorType(
transport="nccl",
_direct_return=True,
_static_shape=True,
)
)
dag = receiver.recv.bind(dag)

compiled_dag = dag.experimental_compile()

ref = compiled_dag.execute(1)
with pytest.raises(
ValueError,
match=(
"Task annotated with _direct_return=True must return a "
"CUDA torch.Tensor, instead found value `1`. "
"DAG will shut down."
),
):
ray.get(ref)

with pytest.raises(RayChannelError):
# The DAG is not usable after the exception.
ref = compiled_dag.execute(2)


@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_torch_tensor_nccl_all_reduce(ray_start_regular):
"""
Expand Down
15 changes: 9 additions & 6 deletions python/ray/experimental/channel/torch_tensor_nccl_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,11 @@ def write(self, value: Any, timeout: Optional[float] = None) -> None:
if isinstance(value, ray.exceptions.RayTaskError):
if self._typ.static_shape or self._typ.direct_return:
# Raise a fatal error to teardown the DAG.
# This error will also be caught from `CompiledDAGRef.get()`
ruisearch42 marked this conversation as resolved.
Show resolved Hide resolved
# and raised to the user
# TODO(swang): Write exceptions to the tensor metadata or
# non-tensor data channel if it is available.
# non-tensor data channel if it is available to make these
# exceptions recoverable.
raise value

if self._cpu_data_channel is None:
Expand All @@ -237,12 +240,12 @@ def write(self, value: Any, timeout: Optional[float] = None) -> None:
# directly without trying to serialize it first.
import torch

# These ValueErrors will also be caught from `CompiledDAGRef.get()`
# and raised to the user
if not isinstance(value, torch.Tensor):
# TODO(swang): These errors are currently fatal for the DAG
# because there is no way for the receiver to receive the
# exception. This could be improved by sending the exception
# through the gpu_data_channel's CPU-based metadata channel,
# if one exists.
# TODO(swang): These errors are currently fatal for the DAG.
# This could be improved by sending the exception through the
# gpu_data_channel's CPU-based metadata channel, if one exists.
raise ValueError(
"Task annotated with _direct_return=True must "
"return a CUDA torch.Tensor, instead found value "
Expand Down
47 changes: 43 additions & 4 deletions python/ray/experimental/compiled_dag_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
from typing import Any, List, Optional

import ray
from ray.exceptions import RayTaskError
from ray.exceptions import (
GetTimeoutError,
RayChannelError,
RayChannelTimeoutError,
RayTaskError,
)
from ray.util.annotations import PublicAPI


Expand Down Expand Up @@ -54,11 +59,15 @@ def __init__(
execution_index: The index of the execution for the DAG.
A DAG can be executed multiple times, and execution index
indicates which execution this CompiledDAGRef corresponds to.
actor_execution_loop_refs: The actor execution loop refs that
are used to execute the DAG. This can be used internally to
check the task execution errors in case of exceptions.
channel_index: The index of the DAG's output channel to fetch
the result from. A DAG can have multiple output channels, and
channel index indicates which channel this CompiledDAGRef
corresponds to. If channel index is not provided, this CompiledDAGRef
wraps the results from all output channels.

"""
self._dag = dag
self._execution_index = execution_index
Expand Down Expand Up @@ -102,9 +111,39 @@ def get(self, timeout: Optional[float] = None):
)

self._ray_get_called = True
return_vals = self._dag._execute_until(
self._execution_index, self._channel_index, timeout
)
try:
return_vals = self._dag._execute_until(
self._execution_index, self._channel_index, timeout
)
except RayChannelTimeoutError:
raise
except RayChannelError as channel_error:
# If we get a channel error, we'd like to call ray.get() on
# the actor execution loop refs to check if this is a result
# of task execution error which could not be passed down
# (e.g., when a pure NCCL channel is used, it is only
# able to send tensors, but not the wrapped exceptions).
# In this case, we'd like to raise the task execution error
# (which is the actual cause of the channel error) instead
# of the channel error itself.
# TODO(rui): determine which error to raise if multiple
# actor task refs have errors.
actor_execution_loop_refs = list(self._dag.worker_task_refs.values())
try:
ray.get(actor_execution_loop_refs, timeout=10)
except GetTimeoutError as timeout_error:
raise Exception(
"Timed out when getting the actor execution loop exception. "
"This should not happen, please file a GitHub issue."
) from timeout_error
except Exception as execution_error:
# Use 'from None' to suppress the context of the original
# channel error, which is not useful to the user.
raise execution_error from None
else:
raise channel_error
except Exception:
raise
return _process_return_vals(return_vals, True)


Expand Down
Loading