Skip to content

Commit

Permalink
[core] retryable exceptions for method (ray-project#41194)
Browse files Browse the repository at this point in the history
Implements retryable exceptions for methods. Also plumbs direct_actor_task_submitter quite a bit. Behavior:

Added new method-level annotation max_retries that overrides actor's max_task_retries if any.
If user_exceptions=True | List[exception class types] and ((max_retries or max_task_retries) > 0) we may retry the method by issuing another task invocation to the actor.
Both exception-retry and actor-death-retry counts toward max_retries. For example for a max_retries=3 call, and there are 2 actor deaths and 1 exception and 1 return, we can return the value.
For a streaming generator call, if it yielded 4 values then raises an exception, we retry by calling the method again and ignoring the first 4 values and start yielding the 5th value in the second run.
Java and CPP: they still have max_task_retries on actor deaths, but I did not add max_retries or retry_exceptions.
  • Loading branch information
rynewang authored Dec 8, 2023
1 parent 567e574 commit 208e452
Show file tree
Hide file tree
Showing 34 changed files with 748 additions and 98 deletions.
9 changes: 7 additions & 2 deletions cpp/src/ray/runtime/task/local_mode_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,13 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation,
TaskID::ForActorCreationTask(invocation.actor_id);
const ObjectID actor_creation_dummy_object_id =
ObjectID::FromIndex(actor_creation_task_id, 1);
builder.SetActorTaskSpec(
invocation.actor_id, actor_creation_dummy_object_id, invocation.actor_counter);
// NOTE: Ray CPP doesn't support retries and retry_exceptions.
builder.SetActorTaskSpec(invocation.actor_id,
actor_creation_dummy_object_id,
/*max_retries=*/0,
/*retry_exceptions=*/false,
/*serialized_retry_exception_allowlist=*/"",
invocation.actor_counter);
} else {
throw RayException("unknown task type");
}
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/ray/runtime/task/native_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,17 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation,
options.generator_backpressure_num_objects = -1;
std::vector<rpc::ObjectReference> return_refs;
if (invocation.task_type == TaskType::ACTOR_TASK) {
// NOTE: Ray CPP doesn't support per-method max_retries and retry_exceptions
const auto native_actor_handle = core_worker.GetActorHandle(invocation.actor_id);
int max_retries = native_actor_handle->MaxTaskRetries();

auto status = core_worker.SubmitActorTask(invocation.actor_id,
BuildRayFunction(invocation),
invocation.args,
options,
max_retries,
/*retry_exceptions=*/false,
/*serialized_retry_exception_allowlist=*/"",
return_refs);
if (!status.ok()) {
return ObjectID::Nil();
Expand Down
26 changes: 26 additions & 0 deletions doc/source/ray-core/fault_tolerance/actors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,29 @@ terminating <ray-kill-actors>` the actor. You can do this by calling
original handle to the actor.

If ``max_restarts`` is set, you can also allow Ray to automatically restart the actor by passing ``no_restart=False`` to ``ray.kill``.


Actor method exceptions
-----------------------

Sometime you want to retry when an actor method raises exceptions. Use ``max_task_retries`` with ``retry_exceptions`` to retry.

Note that by default, retrying on user raised exceptions is disabled. To enable it, make sure the method is **idempotent**, that is, invoking it multiple times should be equivalent to invoking it only once.

You can set ``retry_exceptions`` in the `@ray.method(retry_exceptions=...)` decorator, or in the `.options(retry_exceptions=...)` in the method call.

Retry behavior depends on the value you set ``retry_exceptions`` to:
- ``retry_exceptions == False`` (default): No retries for user exceptions.
- ``retry_exceptions == True``: Ray retries a method on user exception up to ``max_retries`` times.
- ``retry_exceptions`` is a list of exceptions: Ray retries a method on user exception up to ``max_retries`` times, only if the method raises an exception from these specific classes.

``max_task_retries`` applies to both exceptions and actor crashes. Ray searches for the first non-default value of ``max_task_retries`` in this order:

.. - The method call's value, for example, `actor.method.options(_max_retries=2)`. Ray ignores this value if you didn't set it.
.. - The method definition's value, for example, `@ray.method(_max_retries=2)`. Ray ignores this value if you didn't set it.
- The actor creation call's value, for example, `Actor.options(max_task_retries=2)`. Ray ignores this value if you didn't set it.
- The Actor class definition's value, for example, `@ray.remote(max_task_retries=2)` decorator. Ray ignores this value if you didn't set it.
- The default value,`0`.

For example, if a method sets `max_retries=5` and `retry_exceptions=True`, and the actor sets `max_restarts=2`, Ray executes the method up to 6 times: once for the initial invocation, and 5 additional retries. The 6 invocations may include 2 actor crashes. After the 6th invocation, a `ray.get` call to the result Ray ObjectRef raises the exception raised in the last invocation, or `ray.exceptions.RayActorError` if the actor crashed in the last invocation.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public List<ObjectId> submitActorTask(
int numReturns,
CallOptions options) {
Preconditions.checkState(actor instanceof NativeActorHandle);
// TODO: Ray Java does not support per-method MaxRetries. It only supports
// setting Actor-level MaxTaskRetries for any method calls.
// See: src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc
List<byte[]> returnIds =
nativeSubmitActorTask(
actor.getId().getBytes(),
Expand Down
6 changes: 0 additions & 6 deletions python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,6 @@ def validate_actor_options(options: Dict[str, Any], in_options: bool):
"Setting 'concurrency_groups' is not supported in '.options()'."
)

if options.get("max_restarts", 0) == 0 and options.get("max_task_retries", 0) != 0:
raise ValueError(
"'max_task_retries' cannot be set if 'max_restarts' "
"is 0 or if 'max_restarts' is not set."
)

if options.get("get_if_exists") and not options.get("name"):
raise ValueError("The actor name must be specified to use `get_if_exists`.")

Expand Down
69 changes: 48 additions & 21 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,21 @@ cdef raise_if_dependency_failed(arg):
raise arg


def serialize_retry_exception_allowlist(retry_exception_allowlist, function_descriptor):
try:
return ray_pickle.dumps(retry_exception_allowlist)
except TypeError as e:
msg = (
"Could not serialize the retry exception allowlist"
f"{retry_exception_allowlist} for task {function_descriptor.repr}. "
"See "
"https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting " # noqa
"for more information.")
raise TypeError(msg) from e


cdef c_bool determine_if_retryable(
c_bool should_retry_exceptions,
Exception e,
const c_string serialized_retry_exception_allowlist,
FunctionDescriptor function_descriptor,
Expand All @@ -983,6 +997,8 @@ cdef c_bool determine_if_retryable(
- Deserialization of exception allowlist fails (TypeError)
- Exception allowlist is not None and not a tuple (AssertionError)
"""
if not should_retry_exceptions:
return False
if len(serialized_retry_exception_allowlist) == 0:
# No exception allowlist specified, default to all retryable.
return True
Expand Down Expand Up @@ -1505,12 +1521,13 @@ cdef create_generator_error_object(
CoreWorker core_worker = worker.core_worker

is_retryable_error[0] = determine_if_retryable(
should_retry_exceptions,
e,
serialized_retry_exception_allowlist,
function_descriptor,
)

if is_retryable_error[0] and should_retry_exceptions:
if is_retryable_error[0]:
logger.debug(
"Task failed with retryable exception:"
" {}.".format(task_id), exc_info=True)
Expand Down Expand Up @@ -1571,11 +1588,12 @@ cdef execute_dynamic_generator_and_store_task_outputs(
generator_id)
except Exception as error:
is_retryable_error[0] = determine_if_retryable(
should_retry_exceptions,
error,
serialized_retry_exception_allowlist,
function_descriptor,
)
if is_retryable_error[0] and should_retry_exceptions:
if is_retryable_error[0]:
logger.info("Task failed with retryable exception:"
" {}.".format(
core_worker.get_current_task_id()),
Expand Down Expand Up @@ -1710,7 +1728,7 @@ cdef void execute_task(
raise RayActorError(
ActorDiedErrorContext(
error_message=error_message,
actor_id=core_worker.get_actor_id(),
actor_id=core_worker.get_actor_id().binary(),
class_name=class_name
)
)
Expand Down Expand Up @@ -1872,11 +1890,12 @@ cdef void execute_task(
exit_current_actor_if_asyncio()
except Exception as e:
is_retryable_error[0] = determine_if_retryable(
e,
serialized_retry_exception_allowlist,
function_descriptor,
)
if is_retryable_error[0] and should_retry_exceptions:
should_retry_exceptions,
e,
serialized_retry_exception_allowlist,
function_descriptor,
)
if is_retryable_error[0]:
logger.debug("Task failed with retryable exception:"
" {}.".format(
core_worker.get_current_task_id()),
Expand Down Expand Up @@ -3738,18 +3757,9 @@ cdef class CoreWorker:
self.python_scheduling_strategy_to_c(
scheduling_strategy, &c_scheduling_strategy)

try:
serialized_retry_exception_allowlist = ray_pickle.dumps(
retry_exception_allowlist,
)
except TypeError as e:
msg = (
"Could not serialize the retry exception allowlist"
f"{retry_exception_allowlist} for task {function_descriptor.repr}. "
"Check "
"https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting " # noqa
"for more information.")
raise TypeError(msg) from e
serialized_retry_exception_allowlist = serialize_retry_exception_allowlist(
retry_exception_allowlist,
function_descriptor)

with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
Expand Down Expand Up @@ -3946,6 +3956,9 @@ cdef class CoreWorker:
args,
c_string name,
int num_returns,
int max_retries,
c_bool retry_exceptions,
retry_exception_allowlist,
double num_method_cpus,
c_string concurrency_group_name,
int64_t generator_backpressure_num_objects):
Expand All @@ -3962,6 +3975,11 @@ cdef class CoreWorker:
# This task id is incorrect if async task is used.
# In this case, we should use task_id_in_async_context
TaskID current_task = self.get_current_task_id()
c_string serialized_retry_exception_allowlist

serialized_retry_exception_allowlist = serialize_retry_exception_allowlist(
retry_exception_allowlist,
function_descriptor)

with self.profile_event(b"submit_task"):
if num_method_cpus > 0:
Expand Down Expand Up @@ -3991,6 +4009,9 @@ cdef class CoreWorker:
c_resources,
concurrency_group_name,
generator_backpressure_num_objects),
max_retries,
retry_exceptions,
serialized_retry_exception_allowlist,
return_refs,
current_c_task_id)
# These arguments were serialized and put into the local object
Expand Down Expand Up @@ -4093,6 +4114,7 @@ cdef class CoreWorker:
dereference(c_actor_handle).ActorLanguage())
actor_creation_function_descriptor = CFunctionDescriptorToPython(
dereference(c_actor_handle).ActorCreationTaskFunctionDescriptor())
max_task_retries = dereference(c_actor_handle).MaxTaskRetries()
if language == Language.PYTHON:
assert isinstance(actor_creation_function_descriptor,
PythonFunctionDescriptor)
Expand All @@ -4106,21 +4128,26 @@ cdef class CoreWorker:
job_id, actor_creation_function_descriptor)
method_meta = ray.actor._ActorClassMethodMetadata.create(
actor_class, actor_creation_function_descriptor)
return ray.actor.ActorHandle(language, actor_id,
return ray.actor.ActorHandle(language, actor_id, max_task_retries,
method_meta.method_is_generator,
method_meta.decorators,
method_meta.signatures,
method_meta.num_returns,
method_meta.max_retries,
method_meta.retry_exceptions,
method_meta.generator_backpressure_num_objects, # noqa
actor_method_cpu,
actor_creation_function_descriptor,
worker.current_session_and_job)
else:
return ray.actor.ActorHandle(language, actor_id,
0, # max_task_retries,
{}, # method is_generator
{}, # method decorators
{}, # method signatures
{}, # method num_returns
{}, # method max_retries
{}, # method retry_exceptions
{}, # generator_backpressure_num_objects
0, # actor method cpu
actor_creation_function_descriptor,
Expand Down
Loading

0 comments on commit 208e452

Please sign in to comment.