Skip to content

Commit

Permalink
Test: Add a test case for chained RPC calls (AimRT#111)
Browse files Browse the repository at this point in the history
* add new option to rename service_name

* Modify the logic

* add zenoh proxy server

* opt code

---------

Co-authored-by: hanjun <hanjun@agibot.com>
  • Loading branch information
owny990312 and hanjun authored Dec 3, 2024
1 parent dca7399 commit 4bfe31f
Show file tree
Hide file tree
Showing 36 changed files with 755 additions and 24 deletions.
1 change: 1 addition & 0 deletions src/examples/cpp/pb_rpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ set_namespace()

# module
add_subdirectory(module/benchmark_rpc_client_module)
add_subdirectory(module/proxy_rpc_co_module)
add_subdirectory(module/normal_rpc_co_client_module)
add_subdirectory(module/normal_rpc_sync_client_module)
add_subdirectory(module/normal_rpc_async_client_module)
Expand Down
41 changes: 41 additions & 0 deletions src/examples/cpp/pb_rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,44 @@

说明:
- 此示例与 **protobuf rpc co** 示例基本一致,唯一的区别是将 `NormalRpcCoClientModule``NormalRpcCoServerModule` 集成到 `pb_rpc_pkg` 一个 Pkg 中;

## protobuf proxy rpc co


一个基于 protobuf 协议、协程型接口与 local 后端的 rpc 示例,演示内容包括:
- 如何使用 protobuf 协议作为 rpc 服务协议;
- 如何基于 Module 方式使用 Executor、协程型 Rpc client 和 server 接口;
- 如何使用协程形式的 Rpc filter 功能;
- 如何使用 local 类型的 rpc 后端;
- 如何以 Pkg 模式集成 Module 并启动;
- 如何构建链式 rpc 调用;



核心代码:
- [rpc.proto](../../../protocols/example/rpc.proto)
- [proxy_rpc_co_module.cc](./module/proxy_rpc_co_module/proxy_rpc_co_module.cc)
- [service.cc](./module/proxy_rpc_co_module/service.cc)
- [pb_rpc_client_pkg/pkg_main.cc](./pkg/pb_rpc_client_pkg/pkg_main.cc)
- [pb_rpc_server_pkg/pkg_main.cc](./pkg/pb_rpc_server_pkg/pkg_main.cc)


配置文件:
- [examples_cpp_pb_proxy_rpc_co_cfg.yaml](./install/linux/bin/cfg/examples_cpp_pb_proxy_rpc_co_cfg.yaml)



运行方式(linux):
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_WITH_PROTOBUF` 选项编译 AimRT;
- 直接运行 build 目录下`start_examples_cpp_pb_rpc_co.sh`脚本启动进程;
- 键入`ctrl-c`停止进程;


说明:
- 此示例创建了以下三个模块, 以构成 RPC 的链式调用:
- `NormalRpcCoClientModule`:会基于 `work_thread_pool` 执行器,以配置的频率,通过协程 Client 接口,向 `ExampleService_1` 发起 RPC 请求;
- `ProxyRpcCoModule`:会注册 `ExampleService_1` 服务端,通过协程 Server 接口,提供 echo 功能, 并向 `ExampleService_2` 发起 RPC 请求;
- `NormalRpcCoServerModule`:会注册 `ExampleService_2` 服务端,通过协程 Server 接口,提供 echo 功能;
- 此示例将 `NormalRpcCoClientModule` 集成到 `pb_rpc_client_pkg`, 将 `ProxyRpcCoModule``NormalRpcCoServerModule` 集成到 `pb_rpc_server_pkg` Pkg 中,并在配置文件中加载这两个 Pkg 到一个 AimRT 进程中;
- 此示例在 Rpc Client 端和 Server 端分别注册了两个 Filter 用于打印请求日志和计算耗时;
- 此示例使用 local 类型的 rpc 后端进行通信,并配置了 `timeout_handle` 执行器作为超时执行器;
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.

aimrt:
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: work_thread_pool
type: asio_thread
options:
thread_num: 4
- name: timeout_handle
type: time_wheel
options:
bind_executor: work_thread_pool
rpc:
backends:
- type: local
options:
timeout_executor: timeout_handle
clients_options:
- func_name: "(.*)"
enable_backends: [local]
servers_options:
- func_name: "(.*)"
enable_backends: [local]
module:
pkgs:
- path: ./libpb_rpc_client_pkg.so
enable_modules: [NormalRpcCoClientModule]
- path: ./libpb_rpc_server_pkg.so
enable_modules: [ProxyRpcCoModule,NormalRpcCoServerModule]
modules:
- name: NormalRpcCoClientModule
log_lvl: INFO
- name: ProxyRpcCoModule
log_lvl: INFO
- name: NormalRpcCoServerModule
log_lvl: INFO

# Module custom configuration
NormalRpcCoClientModule:
rpc_frq: 0.5
service_name: "ExampleService_1"

ProxyRpcCoModule:
service_name_for_client: "ExampleService_2"
service_name_for_server: "ExampleService_1"

NormalRpcCoServerModule:
service_name: "ExampleService_2"
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

./aimrt_main --cfg_file_path=./cfg/examples_cpp_pb_proxy_rpc_co_cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ bool BenchmarkRpcClientModule::Initialize(aimrt::CoreRef core) {
YAML::Node cfg_node = YAML::LoadFile(std::string(file_path));
max_parallel_ = cfg_node["max_parallel"].as<uint32_t>();

if (cfg_node["service_name"]) {
service_name_ = cfg_node["service_name"].as<std::string>();
}

if (cfg_node["bench_plans"] && cfg_node["bench_plans"].IsSequence()) {
for (const auto& bench_plan_node : cfg_node["bench_plans"]) {
BenchPlan bench_plan;
Expand Down Expand Up @@ -83,12 +87,22 @@ bool BenchmarkRpcClientModule::Initialize(aimrt::CoreRef core) {
AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed.");

// Register rpc client
bool ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
bool ret = false;
if (service_name_.empty()) {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
} else {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_);
}

AIMRT_CHECK_ERROR_THROW(ret, "Register client failed.");

// Create rpc proxy
proxy_ = std::make_shared<aimrt::protocols::example::ExampleServiceCoProxy>(rpc_handle);

if (!service_name_.empty()) {
proxy_->SetServiceName(service_name_);
}

// Check executor
client_statistics_executor_ = core_.GetExecutorManager().GetExecutor("client_statistics_executor");
AIMRT_CHECK_ERROR_THROW(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class BenchmarkRpcClientModule : public aimrt::ModuleBase {
// cfg
uint32_t max_parallel_;
std::vector<BenchPlan> bench_plans_;
std::string service_name_;
};

} // namespace aimrt::examples::cpp::pb_rpc::benchmark_rpc_client_module
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ bool NormalRpcAsyncClientModule::Initialize(aimrt::CoreRef core) {
if (!file_path.empty()) {
YAML::Node cfg_node = YAML::LoadFile(file_path);
rpc_frq_ = cfg_node["rpc_frq"].as<double>();

if (cfg_node["service_name"]) {
service_name_ = cfg_node["service_name"].as<std::string>();
}
}

// Get executor handle
Expand All @@ -29,9 +33,22 @@ bool NormalRpcAsyncClientModule::Initialize(aimrt::CoreRef core) {
AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed.");

// Register rpc client
bool ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
bool ret = false;
if (service_name_.empty()) {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
} else {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_);
}

AIMRT_CHECK_ERROR_THROW(ret, "Register client failed.");

// Create rpc proxy
proxy_ = std::make_shared<aimrt::protocols::example::ExampleServiceAsyncProxy>(rpc_handle);

if (!service_name_.empty()) {
proxy_->SetServiceName(service_name_);
}

} catch (const std::exception& e) {
AIMRT_ERROR("Init failed, {}", e.what());
return false;
Expand Down Expand Up @@ -73,22 +90,19 @@ void NormalRpcAsyncClientModule::MainLoopFunc() {
count_++;
AIMRT_INFO("Loop count : {} -------------------------", count_);

// Create proxy
aimrt::protocols::example::ExampleServiceAsyncProxy proxy(core_.GetRpcHandle());

// Create req and rsp
auto req_ptr = std::make_shared<aimrt::protocols::example::GetFooDataReq>();
auto rsp_ptr = std::make_shared<aimrt::protocols::example::GetFooDataRsp>();
req_ptr->set_msg("hello world foo, count " + std::to_string(count_));

// Create ctx
auto ctx_ptr = proxy.NewContextSharedPtr();
auto ctx_ptr = proxy_->NewContextSharedPtr();
ctx_ptr->SetTimeout(std::chrono::seconds(3));

AIMRT_INFO("Client start new rpc call. req: {}", aimrt::Pb2CompactJson(*req_ptr));

// Call rpc
proxy.GetFooData(
proxy_->GetFooData(
ctx_ptr, *req_ptr, *rsp_ptr,
[this, ctx_ptr, req_ptr, rsp_ptr](aimrt::rpc::Status status) {
// Check result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class NormalRpcAsyncClientModule : public aimrt::ModuleBase {
std::promise<void> stop_sig_;

double rpc_frq_ = 1.0;
std::string service_name_;

std::shared_ptr<aimrt::protocols::example::ExampleServiceAsyncProxy> proxy_;
};

} // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_async_client_module
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "normal_rpc_async_server_module/normal_rpc_async_server_module.h"
#include "normal_rpc_async_server_module/global.h"

#include "yaml-cpp/yaml.h"

namespace aimrt::examples::cpp::pb_rpc::normal_rpc_async_server_module {

bool NormalRpcAsyncServerModule::Initialize(aimrt::CoreRef core) {
Expand All @@ -12,11 +14,26 @@ bool NormalRpcAsyncServerModule::Initialize(aimrt::CoreRef core) {
SetLogger(core_.GetLogger());

try {
// Read cfg
std::string file_path = std::string(core_.GetConfigurator().GetConfigFilePath());
if (!file_path.empty()) {
YAML::Node cfg_node = YAML::LoadFile(file_path);
if (cfg_node["service_name"]) {
service_name_ = cfg_node["service_name"].as<std::string>();
}
}

// Create service
service_ptr_ = std::make_shared<ExampleServiceAsyncServiceImpl>();

// Register service
bool ret = core_.GetRpcHandle().RegisterService(service_ptr_.get());
bool ret = false;
if (service_name_.empty()) {
ret = core_.GetRpcHandle().RegisterService(service_ptr_.get());
} else {
ret = core_.GetRpcHandle().RegisterService(service_name_, service_ptr_.get());
}

AIMRT_CHECK_ERROR_THROW(ret, "Register service failed.");

AIMRT_INFO("Register service succeeded.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class NormalRpcAsyncServerModule : public aimrt::ModuleBase {
private:
aimrt::CoreRef core_;
std::shared_ptr<ExampleServiceAsyncServiceImpl> service_ptr_;

std::string service_name_;
};

} // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_async_server_module
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ bool NormalRpcCoClientModule::Initialize(aimrt::CoreRef core) {
if (!file_path.empty()) {
YAML::Node cfg_node = YAML::LoadFile(file_path);
rpc_frq_ = cfg_node["rpc_frq"].as<double>();

if (cfg_node["service_name"]) {
service_name_ = cfg_node["service_name"].as<std::string>();
}
}

// Get executor handle
Expand All @@ -34,12 +38,22 @@ bool NormalRpcCoClientModule::Initialize(aimrt::CoreRef core) {
AIMRT_CHECK_ERROR_THROW(rpc_handle, "Get rpc handle failed.");

// Register rpc client
bool ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
bool ret = false;
if (service_name_.empty()) {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle);
} else {
ret = aimrt::protocols::example::RegisterExampleServiceClientFunc(rpc_handle, service_name_);
}

AIMRT_CHECK_ERROR_THROW(ret, "Register client failed.");

// Create rpc proxy
proxy_ = std::make_shared<aimrt::protocols::example::ExampleServiceCoProxy>(rpc_handle);

if (!service_name_.empty()) {
proxy_->SetServiceName(service_name_);
}

// Register filter
proxy_->RegisterFilter([this](aimrt::rpc::ContextRef ctx,
const void* req_ptr, void* rsp_ptr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class NormalRpcCoClientModule : public aimrt::ModuleBase {
std::atomic_bool run_flag_ = true;

double rpc_frq_ = 1.0;
std::string service_name_;

std::shared_ptr<aimrt::protocols::example::ExampleServiceCoProxy> proxy_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include "normal_rpc_co_server_module/filter.h"
#include "normal_rpc_co_server_module/global.h"

#include "yaml-cpp/yaml.h"

namespace aimrt::examples::cpp::pb_rpc::normal_rpc_co_server_module {

bool NormalRpcCoServerModule::Initialize(aimrt::CoreRef core) {
Expand All @@ -13,6 +15,15 @@ bool NormalRpcCoServerModule::Initialize(aimrt::CoreRef core) {
SetLogger(core_.GetLogger());

try {
// Read cfg
std::string file_path = std::string(core_.GetConfigurator().GetConfigFilePath());
if (!file_path.empty()) {
YAML::Node cfg_node = YAML::LoadFile(file_path);
if (cfg_node["service_name"]) {
service_name_ = cfg_node["service_name"].as<std::string>();
}
}

// Create service
service_ptr_ = std::make_shared<ExampleServiceImpl>();

Expand All @@ -21,7 +32,13 @@ bool NormalRpcCoServerModule::Initialize(aimrt::CoreRef core) {
service_ptr_->RegisterFilter(TimeCostLogServerFilter);

// Register service
bool ret = core_.GetRpcHandle().RegisterService(service_ptr_.get());
bool ret = false;
if (service_name_.empty()) {
ret = core_.GetRpcHandle().RegisterService(service_ptr_.get());
} else {
ret = core_.GetRpcHandle().RegisterService(service_name_, service_ptr_.get());
}

AIMRT_CHECK_ERROR_THROW(ret, "Register service failed.");

AIMRT_INFO("Register service succeeded.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class NormalRpcCoServerModule : public aimrt::ModuleBase {
private:
aimrt::CoreRef core_;
std::shared_ptr<ExampleServiceImpl> service_ptr_;

std::string service_name_;
};

} // namespace aimrt::examples::cpp::pb_rpc::normal_rpc_co_server_module
Loading

0 comments on commit 4bfe31f

Please sign in to comment.