Skip to content

Commit

Permalink
[core] Fix gcs member dependency order (#49482)
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <dentinyhao@gmail.com>
  • Loading branch information
dentiny authored Dec 29, 2024
1 parent af933a7 commit 8c2d945
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 16 deletions.
13 changes: 7 additions & 6 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,17 +458,18 @@ void GcsNodeManager::Initialize(const GcsInitData &gcs_init_data) {
sorted_dead_node_list_.emplace_back(node_id, node_info.end_time_ms());
}
}
sorted_dead_node_list_.sort(
[](const std::pair<NodeID, int64_t> &left,
const std::pair<NodeID, int64_t> &right) { return left.second < right.second; });
std::sort(
sorted_dead_node_list_.begin(),
sorted_dead_node_list_.end(),
[](const auto &left, const auto &right) { return left.second < right.second; });
}

void GcsNodeManager::AddDeadNodeToCache(std::shared_ptr<rpc::GcsNodeInfo> node) {
if (dead_nodes_.size() >= RayConfig::instance().maximum_gcs_dead_node_cached_count()) {
const auto &node_id = sorted_dead_node_list_.begin()->first;
const auto &node_id = sorted_dead_node_list_.front().first;
RAY_CHECK_OK(gcs_table_storage_->NodeTable().Delete(node_id, nullptr));
dead_nodes_.erase(sorted_dead_node_list_.begin()->first);
sorted_dead_node_list_.erase(sorted_dead_node_list_.begin());
dead_nodes_.erase(sorted_dead_node_list_.front().first);
sorted_dead_node_list_.pop_front();
}
auto node_id = NodeID::FromBinary(node->node_id());
dead_nodes_.emplace(node_id, node);
Expand Down
11 changes: 5 additions & 6 deletions src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <boost/bimap.hpp>
#include <boost/bimap/unordered_multiset_of.hpp>
#include <boost/bimap/unordered_set_of.hpp>
#include <deque>

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
Expand All @@ -34,8 +35,7 @@
#include "ray/util/event.h"
#include "src/ray/protobuf/gcs.pb.h"

namespace ray {
namespace gcs {
namespace ray::gcs {

class GcsAutoscalerStateManagerTest;
class GcsStateTest;
Expand Down Expand Up @@ -235,8 +235,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// Dead nodes.
absl::flat_hash_map<NodeID, std::shared_ptr<rpc::GcsNodeInfo>> dead_nodes_;
/// The nodes are sorted according to the timestamp, and the oldest is at the head of
/// the list.
std::list<std::pair<NodeID, int64_t>> sorted_dead_node_list_;
/// the deque.
std::deque<std::pair<NodeID, int64_t>> sorted_dead_node_list_;
/// Listeners which monitors the addition of nodes.
std::vector<std::function<void(std::shared_ptr<rpc::GcsNodeInfo>)>>
node_added_listeners_;
Expand Down Expand Up @@ -271,5 +271,4 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
friend GcsStateTest;
};

} // namespace gcs
} // namespace ray
} // namespace ray::gcs
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) {
// Initialize by gcs tables data.
gcs_node_manager_->Initialize(gcs_init_data);
// Register service.
node_info_service_.reset(new rpc::NodeInfoGrpcService(
io_context_provider_.GetDefaultIOContext(), *gcs_node_manager_));
node_info_service_ = std::make_unique<rpc::NodeInfoGrpcService>(
io_context_provider_.GetDefaultIOContext(), *gcs_node_manager_);
rpc_server_.RegisterService(*node_info_service_);
}

Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ class GcsServer {
std::unique_ptr<GcsResourceManager> gcs_resource_manager_;
/// The autoscaler state manager.
std::unique_ptr<GcsAutoscalerStateManager> gcs_autoscaler_state_manager_;
/// A publisher for publishing gcs messages.
std::unique_ptr<GcsPublisher> gcs_publisher_;
/// The gcs node manager.
std::unique_ptr<GcsNodeManager> gcs_node_manager_;
/// The health check manager.
Expand All @@ -254,8 +256,6 @@ class GcsServer {
/// The gcs placement group scheduler.
/// [gcs_placement_group_scheduler_] depends on [raylet_client_pool_].
std::unique_ptr<GcsPlacementGroupScheduler> gcs_placement_group_scheduler_;
/// A publisher for publishing gcs messages.
std::unique_ptr<GcsPublisher> gcs_publisher_;
/// Function table manager.
std::unique_ptr<GcsFunctionManager> function_manager_;
/// Stores references to URIs stored by the GCS for runtime envs.
Expand Down

0 comments on commit 8c2d945

Please sign in to comment.