Skip to content

Commit

Permalink
[Core]create internal module for thridparty system compatible building (
Browse files Browse the repository at this point in the history
  • Loading branch information
ashione authored Mar 28, 2024
1 parent bccf140 commit 5e7b3c3
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 15 deletions.
83 changes: 68 additions & 15 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ ray_cc_library(
linkopts = PLASMA_LINKOPTS,
deps = [
":plasma_client",
"//src/ray/common:network",
":stats_lib",
"//src/ray/common:network",
],
)

Expand Down Expand Up @@ -772,6 +772,26 @@ ray_cc_library(
],
)

# This header is used to wrap some internal codes so we can reduce suspicious
# symbols export.
ray_cc_library(
name = "exported_internal",
srcs =
[
"src/ray/internal/internal.cc",
],
hdrs =
[
"src/ray/internal/internal.h",
],
copts = COPTS,
strip_include_prefix = "src",
deps = [
":core_worker_lib",
],
alwayslink = 1,
)

ray_cc_test(
name = "core_worker_resubmit_queue_test",
size = "small",
Expand Down Expand Up @@ -1210,17 +1230,20 @@ ray_cc_test(
srcs = [
"src/ray/object_manager/plasma/test/mutable_object_test.cc",
],
tags = ["team:core", "no_windows"],
tags = [
"no_windows",
"team:core",
],
target_compatible_with = [
"@platforms//os:linux",
],
deps = [
":core_worker_lib",
":plasma_store_server_lib",
"@com_google_absl//absl/random",
"@com_google_absl//absl/strings:str_format",
"@com_google_googletest//:gtest_main",
],
target_compatible_with = [
"@platforms//os:linux",
],
)

ray_cc_test(
Expand Down Expand Up @@ -1282,7 +1305,10 @@ ray_cc_test(
name = "worker_pool_test",
size = "small",
srcs = ["src/ray/raylet/worker_pool_test.cc"],
tags = ["team:core", "no_tsan"],
tags = [
"no_tsan",
"team:core",
],
deps = [
":raylet_lib",
"@com_google_googletest//:gtest_main",
Expand Down Expand Up @@ -1440,9 +1466,9 @@ ray_cc_test(
size = "small",
srcs = ["src/ray/stats/stats_test.cc"],
tags = [
"no_tsan",
"stats",
"team:core",
"no_tsan",
],
deps = [
":stats_lib",
Expand Down Expand Up @@ -1517,7 +1543,11 @@ ray_cc_test(
"//:redis-cli",
"//:redis-server",
],
tags = ["team:core", "no_windows", "no_tsan"],
tags = [
"no_tsan",
"no_windows",
"team:core",
],
deps = [
":gcs_server_lib",
":gcs_test_util_lib",
Expand Down Expand Up @@ -1563,7 +1593,10 @@ ray_cc_test(
srcs = [
"src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc",
],
tags = ["team:core", "no_windows"],
tags = [
"no_windows",
"team:core",
],
deps = [
":gcs_server_lib",
"@com_google_googletest//:gtest_main",
Expand Down Expand Up @@ -1624,7 +1657,10 @@ ray_cc_test(
srcs = [
"src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc",
],
tags = ["team:core", "no_tsan"],
tags = [
"no_tsan",
"team:core",
],
deps = [
":gcs_server_lib",
":gcs_server_test_util",
Expand All @@ -1640,7 +1676,10 @@ ray_cc_test(
srcs = [
"src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc",
],
tags = ["team:core", "no_tsan"],
tags = [
"no_tsan",
"team:core",
],
deps = [
":gcs_server_lib",
":gcs_server_test_util",
Expand Down Expand Up @@ -1687,7 +1726,10 @@ ray_cc_test(
srcs = [
"src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc",
],
tags = ["team:core", "no_tsan"],
tags = [
"no_tsan",
"team:core",
],
deps = [
":gcs_server_lib",
":gcs_server_test_util",
Expand Down Expand Up @@ -1908,7 +1950,10 @@ ray_cc_test(
"//:redis-cli",
"//:redis-server",
],
tags = ["team:core", "no_tsan"],
tags = [
"no_tsan",
"team:core",
],
deps = [
":gcs_client_lib",
":gcs_server_lib",
Expand All @@ -1930,7 +1975,10 @@ ray_cc_test(
"//:redis-cli",
"//:redis-server",
],
tags = ["team:core", "no_windows"],
tags = [
"no_windows",
"team:core",
],
deps = [
":gcs_client_lib",
":gcs_server_lib",
Expand Down Expand Up @@ -2177,7 +2225,10 @@ ray_cc_test(
"//:redis-server",
],
env = {"REDIS_CHAOS": "1"},
tags = ["team:core", "no_windows"],
tags = [
"no_windows",
"team:core",
],
target_compatible_with = [
"@platforms//os:linux",
],
Expand Down Expand Up @@ -2308,6 +2359,7 @@ pyx_library(
),
deps = [
"//:core_worker_lib",
"//:exported_internal",
"//:gcs_server_lib",
"//:global_state_accessor_lib",
"//:raylet_lib",
Expand Down Expand Up @@ -2342,6 +2394,7 @@ ray_cc_binary(
visibility = ["//java:__subpackages__"],
deps = [
"//:core_worker_lib",
"//:exported_internal",
"//:global_state_accessor_lib",
"//:src/ray/ray_exported_symbols.lds",
"//:src/ray/ray_version_script.lds",
Expand Down
85 changes: 85 additions & 0 deletions src/ray/internal/internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2020 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/internal/internal.h"

#include "ray/core_worker/core_worker.h"

namespace ray {
namespace internal {
// NOTE(lingxuan.zlx): This internal module is designed to export ray symbols
// to other thirdparty outside project, which makes they can access internal
// function of core worker library or native function and reduce symbols racing.

using ray::core::CoreWorkerProcess;
using ray::core::TaskOptions;

std::vector<rpc::ObjectReference> SendInternal(
const ActorID &peer_actor_id,
std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function,
int return_num,
int max_retries,
bool retry_exceptions,
std::string serialized_retry_exception_allowlist) {
std::unordered_map<std::string, double> resources;
std::string name = function.GetFunctionDescriptor()->DefaultTaskName();
TaskOptions options{name, return_num, resources};

char meta_data[3] = {'R', 'A', 'W'};
std::shared_ptr<LocalMemoryBuffer> meta =
std::make_shared<LocalMemoryBuffer>((uint8_t *)meta_data, 3, true);

std::vector<std::unique_ptr<TaskArg>> args;
if (function.GetLanguage() == Language::PYTHON) {
auto dummy = "__RAY_DUMMY__";
std::shared_ptr<LocalMemoryBuffer> dummyBuffer =
std::make_shared<LocalMemoryBuffer>((uint8_t *)dummy, 13, true);
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
std::move(dummyBuffer), meta, std::vector<rpc::ObjectReference>(), true)));
}
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
std::move(buffer), meta, std::vector<rpc::ObjectReference>(), true)));

std::vector<std::shared_ptr<RayObject>> results;
std::vector<rpc::ObjectReference> return_refs;
auto result = CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
peer_actor_id,
function,
args,
options,
max_retries,
retry_exceptions,
serialized_retry_exception_allowlist,
return_refs);
if (!result.ok()) {
RAY_CHECK(false) << "Back pressure should not be enabled.";
}
return return_refs;
}

const ray::stats::TagKeyType TagRegister(const std::string tag_name) {
return ray::stats::TagKeyType::Register(tag_name);
}

const std::string TagKeyName(stats::TagKeyType &tagkey) { return tagkey.name(); }

const ActorID &GetCurrentActorID() {
return CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentActorID();
}

bool IsInitialized() { return CoreWorkerProcess::IsInitialized(); }

} // namespace internal
} // namespace ray
55 changes: 55 additions & 0 deletions src/ray/internal/internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2020 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include "ray/common/buffer.h"
#include "ray/common/id.h"
#include "ray/core_worker/common.h"
#include "ray/stats/metric.h"

// This header is used to warp some internal code so we can reduce suspicious
// symbols export.
namespace ray {
namespace internal {

using ray::core::RayFunction;

/// Send buffer internal
/// \param[in] buffer buffer to be sent.
/// \param[in] function the function descriptor of peer's function.
/// \param[in] return_num return value number of the call.
/// \param[in] max_retirs task retries time.
/// \param[in] retry_execptions whether retry if execptions found.
/// \param[in] serialized_retry_exception_allowlist specificed allowed exceptions.
/// \param[out] return_ids return ids from SubmitActorTask.
std::vector<rpc::ObjectReference> SendInternal(
const ActorID &peer_actor_id,
std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function,
int return_num,
int max_retries = -1,
bool retry_exceptions = false,
std::string serialized_retry_exception_allowlist = "");

const stats::TagKeyType TagRegister(const std::string tag_name);

const std::string TagKeyName(stats::TagKeyType &tagkey);

/// Get current actor id via internal.
const ActorID &GetCurrentActorID();

/// Get core worker initialization flag via internal.
bool IsInitialized();
} // namespace internal
} // namespace ray
2 changes: 2 additions & 0 deletions src/ray/ray_exported_symbols.lds
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@
*PyInit__raylet*
*Java_io_ray*
_JNI_On*
*aligned_free*
*aligned_malloc*

0 comments on commit 5e7b3c3

Please sign in to comment.