-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] Improve stall detection for StreamingOutputsBackpressurePolicy #41637
[data] Improve stall detection for StreamingOutputsBackpressurePolicy #41637
Conversation
Signed-off-by: Hao Chen <chenh1024@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add unit tests for the backpressure policy itself (no Dataset execution)?
python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also address the comment about adding a unit test for the policy?
reserved_resources: Amount of reserved resources for non-Ray-Data | ||
workloads. Ray Data will exlcude these resources when scheduling tasks, | ||
unless resource_limits is manually set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reserved_resources: Amount of reserved resources for non-Ray-Data | |
workloads. Ray Data will exlcude these resources when scheduling tasks, | |
unless resource_limits is manually set. | |
exclude_resources: Amount of reserved resources for non-Ray-Data | |
workloads. Ray Data will exlcude these resources when scheduling tasks, | |
unless resource_limits is manually set. |
I just realized reserved_resources sounds a bit like resources reserved for Data, which is the opposite of what we want.
@@ -105,6 +108,10 @@ class ExecutionOptions: | |||
|
|||
resource_limits: ExecutionResources = field(default_factory=ExecutionResources) | |||
|
|||
reserved_resources: ExecutionResources = field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a check that this is not set if resource_limits is manually specified?
"Temporarily unblocking backpressure." | ||
f" Because some tasks of operator {op} have been submitted," | ||
f" but no outputs are generated for {no_output_time} seconds." | ||
" Ignore this warning if your UDF is expected to be slow." | ||
" This may also be because some resources are preempted by" | ||
" non-Ray-Data workloads." | ||
" If this is the case, set `ExecutionOptions.reserved_resources`." | ||
" This message will only be printed once." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make the error message expose fewer implementation details? I don't think users will necessarily know what "backpressure" means, for example.
Maybe something like:
"Operator {op} is running, but has not produced outputs for {no_output_time}s. Ignore this warning if your UDF is expected to be slow.
Otherwise, this can happen when there are fewer cluster resources available to Ray Data than expected. If you have non-Data tasks or actors running in the cluster, reserve resources for them with ray.data.ExecutionOptions.reserved_resources = {"num_cpus": <CPUs to exclude>}
."
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@stephanie-wang thanks. unit test is added and other comments are addressed too. |
Signed-off-by: Hao Chen <chenh1024@gmail.com>
python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/interfaces/execution_options.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/interfaces/execution_options.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/interfaces/execution_options.py
Outdated
Show resolved
Hide resolved
assert res == { | ||
up_state: 0, | ||
down_state: self._max_blocks_in_op_output_queue, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice unit tests!
exclude_resources: Amount of resources to exclude from Ray Data. | ||
Set this if you have other workloads running on the same cluster. | ||
For Ray Data + Ray Train, this should be automatically set. | ||
Note for each resource type, resource_limits and exclude_resources can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just comment to update the doc here that resources from DataConfig add to exclude_resources, instead of overwriting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to add a test for this case also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, updated the unit test to reflect this change.
…ing_output_backpressure_policy.py Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
…ns.py Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
…ns.py Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
…ns.py Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
…chen/ray into streaming-backpressure-detect-stall
…ray-project#41637) When there is non-Data code running in the same clusters. Data StreamExecutor will consider all submitted tasks as active, while they may not actually have resources to run. ray-project#41603 is an attempt to fix the data+train workload by excluding training resources. While this PR is a more general fix for other workloads, with two main changes: 1. Besides detecting active tasks, we also detect if the downstream is not making any progress for a specific interval. 2. Introduce a new `reserved_resources` option to allow specifying non-Data resources. This PR along can also fix ray-project#41496 --------- Signed-off-by: Hao Chen <chenh1024@gmail.com> Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu> Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu> Signed-off-by: Hao Chen <chenh1024@gmail.com>
…#41637) (#41720) When there is non-Data code running in the same clusters. Data StreamExecutor will consider all submitted tasks as active, while they may not actually have resources to run. #41603 is an attempt to fix the data+train workload by excluding training resources. While this PR is a more general fix for other workloads, with two main changes: 1. Besides detecting active tasks, we also detect if the downstream is not making any progress for a specific interval. 2. Introduce a new `reserved_resources` option to allow specifying non-Data resources. This PR along can also fix #41496 --------- Signed-off-by: Hao Chen <chenh1024@gmail.com> Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu> Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu>
Why are these changes needed?
When there is non-Data code running in the same clusters. Data StreamExecutor will consider all submitted tasks as active, while they may not actually have resources to run.
#41603 is an attempt to fix the data+train workload by excluding training resources.
While this PR is a more general fix for other workloads, with two main changes:
reserved_resources
option to allow specifying non-Data resources.This PR along can also fix #41496
Related issue number
Closes #41496
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.