Skip to content

Commit

Permalink
[core] Add an internal system concurrency group for executing compil…
Browse files Browse the repository at this point in the history
…ed dag tasks

As part of https://github.com/ray-project/enhancements/pull/48/files, we need the ability to launch system-managed tasks to execute compiled DAG work. These need to run in a separate concurrency group so they do not block other actor tasks.

This is an internal/experimental API only.
  • Loading branch information
ericl authored Dec 10, 2023
1 parent cb5bb4e commit 603d1a1
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 7 deletions.
15 changes: 15 additions & 0 deletions python/ray/tests/test_concurrency_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,21 @@ def f2(self):
assert "ok" == ray.get(async_actor.f2.remote())


def test_system_concurrency_group(ray_start_regular_shared):
@ray.remote
class NormalActor:
def block_forever(self):
time.sleep(9999)
return "never"

def ping(self):
return "pong"

n = NormalActor.remote()
n.block_forever.options(concurrency_group="_ray_system").remote()
print(ray.get(n.ping.remote()))


if __name__ == "__main__":
import os

Expand Down
5 changes: 5 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,11 @@ RAY_CONFIG(std::string, predefined_unit_instance_resources, "GPU")
/// When set it to "neuron_cores,TPU,FPGA", we will also treat FPGA as unit_instance.
RAY_CONFIG(std::string, custom_unit_instance_resources, "neuron_cores,TPU,NPU")

/// The name of the system-created concurrency group for actors. This group is
/// created with 1 thread, and is created lazily. The intended usage is for
/// Ray-internal auxiliary tasks (e.g., accelerated dag workers).
RAY_CONFIG(std::string, system_concurrency_group_name, "_ray_system")

// Maximum size of the batches when broadcasting resources to raylet.
RAY_CONFIG(uint64_t, resource_broadcast_batch_size, 512)

Expand Down
19 changes: 13 additions & 6 deletions src/ray/core_worker/transport/concurrency_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,21 @@ ConcurrencyGroupManager<ExecutorType>::ConcurrencyGroupManager(
// the thread pools instead of main thread.
if (ExecutorType::NeedDefaultExecutor(max_concurrency_for_default_concurrency_group) ||
!concurrency_groups.empty()) {
defatult_executor_ =
default_executor_ =
std::make_shared<ExecutorType>(max_concurrency_for_default_concurrency_group);
}
}

template <typename ExecutorType>
std::shared_ptr<ExecutorType> ConcurrencyGroupManager<ExecutorType>::GetExecutor(
const std::string &concurrency_group_name, const ray::FunctionDescriptor &fd) {
if (concurrency_group_name == RayConfig::instance().system_concurrency_group_name() &&
name_to_executor_index_.find(concurrency_group_name) ==
name_to_executor_index_.end()) {
auto executor = std::make_shared<ExecutorType>(1);
name_to_executor_index_[concurrency_group_name] = executor;
}

if (!concurrency_group_name.empty()) {
auto it = name_to_executor_index_.find(concurrency_group_name);
/// TODO(qwang): Fail the user task.
Expand All @@ -64,26 +71,26 @@ std::shared_ptr<ExecutorType> ConcurrencyGroupManager<ExecutorType>::GetExecutor
functions_to_executor_index_.end()) {
return functions_to_executor_index_[fd->ToString()];
}
return defatult_executor_;
return default_executor_;
}

/// Get the default executor.
template <typename ExecutorType>
std::shared_ptr<ExecutorType> ConcurrencyGroupManager<ExecutorType>::GetDefaultExecutor()
const {
return defatult_executor_;
return default_executor_;
}

/// Stop and join the executors that the this manager owns.
template <typename ExecutorType>
void ConcurrencyGroupManager<ExecutorType>::Stop() {
if (defatult_executor_) {
if (default_executor_) {
RAY_LOG(DEBUG) << "Default executor is stopping.";
defatult_executor_->Stop();
default_executor_->Stop();
RAY_LOG(INFO) << "Default executor is joining. If the 'Default executor is joined.' "
"message is not printed after this, the worker is probably "
"hanging because the actor task is running an infinite loop.";
defatult_executor_->Join();
default_executor_->Join();
RAY_LOG(INFO) << "Default executor is joined.";
}

Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/transport/concurrency_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ConcurrencyGroupManager final {
functions_to_executor_index_;

// The default concurrency group executor. It's nullptr if its max concurrency is 1.
std::shared_ptr<ExecutorType> defatult_executor_ = nullptr;
std::shared_ptr<ExecutorType> default_executor_ = nullptr;

friend class ConcurrencyGroupManagerTest;
};
Expand Down

0 comments on commit 603d1a1

Please sign in to comment.