Skip to content

Commit

Permalink
feat: add proxy plugin to transfer messages from one backend to multi…
Browse files Browse the repository at this point in the history
…ple backends (AimRT#108)

* feat(plugins): add proxy plugin

* feat: add proxy action

* feat: del the state in proxy action

* feat: format

* fix : add check not to pub same topic and msg_type

* fix: remove TimerSchedule  executor check

* docs: add docs

* doc: change docs

* fix: remove some unnessary code

* refactor(proxy_plugin): migrate TopicMetaKey to core/util directory

* fix: simplify the code

* format

* fix: struct bind

* perf(proxy): use action_raw_ptr capture rather than reference capture local variable

* docs: update proxy_plugin documentation and add example configuration

* fix: remove necessary check in echo plugin

* doc: add proxy plugin example docs

* fix: migrate recordplayback plugin and echo plugin's topic_meta_key into util

* fix: format

* fix: remove necessary code

* perf: remove unnecessary code
  • Loading branch information
yglsaltfish authored Nov 23, 2024
1 parent df5d21e commit 18d45db
Show file tree
Hide file tree
Showing 42 changed files with 1,132 additions and 71 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ cmake_dependent_option(AIMRT_BUILD_TIME_MANIPULATOR_PLUGIN "AimRT build time man
cmake_dependent_option(AIMRT_BUILD_PARAMETER_PLUGIN "AimRT build parameter plugin." OFF "AIMRT_BUILD_RUNTIME;AIMRT_BUILD_WITH_PROTOBUF" OFF)
cmake_dependent_option(AIMRT_BUILD_LOG_CONTROL_PLUGIN "AimRT build log control plugin." OFF "AIMRT_BUILD_RUNTIME;AIMRT_BUILD_WITH_PROTOBUF" OFF)
cmake_dependent_option(AIMRT_BUILD_GRPC_PLUGIN "AimRT build grpc plugin." OFF "AIMRT_BUILD_RUNTIME" OFF)
cmake_dependent_option(AIMRT_BUILD_PROXY_PLUGIN "AimRT build proxy plugin." OFF "AIMRT_BUILD_RUNTIME" OFF)

option(AIMRT_INSTALL "Enable installation of AimRT." ON)

Expand Down
1 change: 1 addition & 0 deletions build.bat
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ cmake -B build ^
-DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=OFF ^
-DAIMRT_BUILD_GRPC_PLUGIN=OFF ^
-DAIMRT_BUILD_ECHO_PLUGIN=OFF ^
-DAIMRT_BUILD_PROXY_PLUGIN=ON ^
-DAIMRT_BUILD_PYTHON_PACKAGE=ON ^
%*

Expand Down
1 change: 1 addition & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ cmake -B build \
-DAIMRT_BUILD_OPENTELEMETRY_PLUGIN=ON \
-DAIMRT_BUILD_GRPC_PLUGIN=ON \
-DAIMRT_BUILD_ECHO_PLUGIN=ON \
-DAIMRT_BUILD_PROXY_PLUGIN=ON \
-DAIMRT_BUILD_PYTHON_PACKAGE=ON \
$@

Expand Down
1 change: 1 addition & 0 deletions document/sphinx-cn/release_notes/v0_9_0.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- 新增 Echo 插件,用于回显消息;
- 新增了基于执行器的定时器,方便执行定时任务;
- aimrt_py channel 和 rpc 支持 ros2 消息类型;
- 新增了 Proxy 插件,用于转发消息;

**次要修改**
- 缩短了一些 examples 的文件路径长度;
Expand Down
1 change: 1 addition & 0 deletions document/sphinx-cn/tutorials/examples/examples_plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ AimRT 提供了以下插件使用示例:
- {{ '[time_manipulator_plugin]({}/src/examples/plugins/time_manipulator_plugin)'.format(code_site_root_path_url) }}
- {{ '[zenoh_plugin]({}/src/examples/plugins/zenoh_plugin)'.format(code_site_root_path_url) }}
- {{ '[echo_plugin]({}/src/examples/plugins/echo_plugin)'.format(code_site_root_path_url) }}
- {{ '[proxy_plugin]({}/src/examples/plugins/proxy_plugin)'.format(code_site_root_path_url) }}

关于这些示例的说明:
- 每个示例都有自己独立的 readme 文档,详情请点击示例链接进入后查看;
Expand Down
1 change: 1 addition & 0 deletions document/sphinx-cn/tutorials/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ plugins/zenoh_plugin.md
plugins/iceoryx_plugin.md
plugins/grpc_plugin.md
plugins/echo_plugin.md
plugins/proxy_plugin.md
```

如果开发者想定制开发自己的插件,可以参考以下文档。
Expand Down
85 changes: 85 additions & 0 deletions document/sphinx-cn/tutorials/plugins/proxy_plugin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@

# echo插件

## 相关链接

参考示例:
- {{ '[proxy_plugin]({}/src/examples/plugins/proxy_plugin)'.format(code_site_root_path_url) }}

## 插件概述

**proxy_plugin**用于对 Channel 中的消息进行代理转发,插件支持独立的 type_support_pkg,并支持指定执行器, 其中执行器需要线程安全,在使用时,插件会根据配置注册一个或多个 Channel Subscriber 或 Publisher。

插件的配置项如下:

| 节点 | 类型 | 是否可选| 默认值 | 作用 |
| ---- | ---- | ---- | ---- | ---- |
| type_support_pkgs | array | 必选 | [] | type support 包配置 |
| type_support_pkgs[i].path | string | 必选 | "" | type support 包的路径 |
| proxy_actions | array | 必选 | [] | 代理转发配置 |
| proxy_actions[i].name | string | 必选 | "" | 代理转发名称 |
| proxy_actions[i].options | object | 必选 | {} | 代理转发配置 |
| proxy_actions[i].options.executor| string | 必选 | "" | 代理转发执行器 |
| proxy_actions[i].options.topic_meta_list | array | 必选 | [] | 要代理转发的 topic 和类型 |
| proxy_actions[i].options.topic_meta_list[j].topic_name | string | 必选 | "" | 要代理转发的 topic |
| proxy_actions[i].options.topic_meta_list[j].msg_type | string | 必选 | "" | 要代理转发的消息类型 |
| proxy_actions[i].options.topic_meta_list[j].pub_topic_name | array | 必选 | [] | 代理转发后的 topic |


请注意,**proxy_plugin**中是以`action`为单元管理代理转发动作的,每个代理转发`action`可以有自己的执行器、topic 等参数,使用时可以根据数据实际大小和频率,为每个 action 分配合理的资源。


### 代理转发的简单示例配置

以下是将一个以 http 为后端的 topic 消息代理转发到两个以 zenoh 和 ros2 为后端的 topic 的简单示例配置,对于 proxy_plugin 需要为每个 action 指定执行器,并且在 channel 处需要为每个订阅的 topic 和转发的 topic 指定后端,其他相关插件的配置请参考[net_plugin](./net_plugin.md), [zenoh_plugin](./zenoh_plugin.md)[ros2_plugin](./ros2_plugin.md);

```yaml
aimrt:
plugin:
plugins:
- name: proxy_plugin
path: ./libaimrt_proxy_plugin.so
options:
type_support_pkgs:
- path: ./libexample_event_ts_pkg.so
proxy_actions:
- name: my_proxy
options:
executor: proxy_plugin_executor
topic_meta_list:
- sub_topic_name: test_topic_http
pub_topic_name: [test_topic_zenoh, test_topic_ros2]
msg_type: pb:aimrt.protocols.example.ExampleEventMsg
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
- name: ros2_plugin
path: ./libaimrt_ros2_plugin.so
options:
node_name: example_ros2_pb_chn_publisher_node
executor_type: MultiThreaded # SingleThreaded/StaticSingleThreaded/MultiThreaded
executor_thread_num: 2
- name: net_plugin
path: ./libaimrt_net_plugin.so
options:
thread_num: 4
http_options:
listen_ip: 127.0.0.1
listen_port: 50081
channel:
backends:
- type: http
- type: zenoh
- type: ros2
sub_topics_options:
- topic_name: test_topic_http
enable_backends: [http]
pub_topics_options:
- topic_name: test_topic_zenoh
enable_backends: [zenoh]
- topic_name: test_topic_ros2
enable_backends: [ros2]
# ...
```



7 changes: 7 additions & 0 deletions src/examples/plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,10 @@ endif()
if(AIMRT_BUILD_WITH_PROTOBUF AND AIMRT_BUILD_GRPC_PLUGIN)
add_subdirectory(grpc_plugin)
endif()

if(AIMRT_BUILD_PROXY_PLUGIN
AND AIMRT_BUILD_WITH_PROTOBUF
AND AIMRT_BUILD_NET_PLUGIN
AND AIMRT_BUILD_ZENOH_PLUGIN)
add_subdirectory(proxy_plugin)
endif()
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/bin/bash

source install/share/example_ros2/local_setup.bash

./aimrt_main --cfg_file_path=./cfg/examples_plugins_echo_plugin_ros2_cfg.yaml
29 changes: 29 additions & 0 deletions src/examples/plugins/proxy_plugin/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.

# Get the current folder name
string(REGEX REPLACE ".*/\(.*\)" "\\1" CUR_DIR ${CMAKE_CURRENT_SOURCE_DIR})

# Set namespace
set_namespace()

# type_support_pkg
add_subdirectory(example_event_ts_pkg)

# install
if(CMAKE_SYSTEM_NAME MATCHES "Linux")
set(CUR_INSTALL_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/install/linux)
elseif(CMAKE_SYSTEM_NAME MATCHES "Windows")
set(CUR_INSTALL_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/install/win)
else()
message(FATAL_ERROR "Unsupport os")
endif()

# build all
get_namespace(CUR_SUPERIOR_NAMESPACE)
string(REPLACE "::" "_" CUR_SUPERIOR_NAMESPACE_UNDERLINE ${CUR_SUPERIOR_NAMESPACE})
add_custom_target(
${CUR_SUPERIOR_NAMESPACE_UNDERLINE}_${CUR_DIR}_build_all ALL
COMMAND ${CMAKE_COMMAND} -E copy_directory ${CUR_INSTALL_SOURCE_DIR}/bin ${CMAKE_BINARY_DIR}
DEPENDS aimrt::runtime::main #
aimrt::examples::cpp::pb_chn::pb_chn_pub_pkg)
30 changes: 30 additions & 0 deletions src/examples/plugins/proxy_plugin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# proxy plugin examples

一个基于 **proxy_plugin** 的代理转发示例,演示内容包括:
- 如何在启动时加载 **proxy_plugin**
- 如何配置消息转发的 topic;

核心代码:
- [event.proto](../../../protocols/example/event.proto)
- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc)
- [type_support_pkg_main.cc](./example_event_ts_pkg/type_support_pkg_main.cc)


配置文件:
- [examples_plugins_proxy_plugin_http_pub_cfg.yaml](./install/linux/bin/cfg/examples_plugins_proxy_plugin_http_pub_cfg.yaml)
- [examples_plugins_proxy_plugin_zenoh_sub_cfg.yaml](./install/linux/bin/cfg/examples_plugins_proxy_plugin_zenoh_sub_cfg.yaml)
- [examples_plugins_proxy_plugin_cfg.yaml](./install/linux/bin/cfg/examples_plugins_proxy_plugin_cfg.yaml)


运行方式(linux):
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_WITH_PROTOBUF``AIMRT_BUILD_WITH_PROXY_PLUGIN``AIMRT_BUILD_WITH_ZENOH_PLUGIN``AIMRT_BUILD_WITH_NET_PLUGIN` 选项编译 AimRT;
- 分别运行 build 目录下 `start_examples_plugins_proxy_plugin_zenoh_sub.sh``start_examples_plugins_proxy_plugin.sh``start_examples_plugins_proxy_plugin_http_pub.sh` 脚本启动进程;
- 键入`ctrl-c`停止进程;


说明:
- 此示例创建了三个进程,分别为 http 发布消息进程、zenoh 订阅消息进程和 proxy 转发进程:
- `http_pub`,会以配置的频率,发布 `ExampleEventMsg` 消息到 `test_topic_http`
- `proxy`,会使用 `proxy_plugin_executor` 执行器,执行代理消息转发操作;
- `zenoh_sub`,会订阅 `test_topic_zenoh`
- 请注意,proxy 插件的原理是向 AimRT 订阅指定的 Topic,因此需要在 channel 配置中为该 topic 设置合适的后端,以保证插件能接收到数据,在此示例中,`test_topic_http` 配置了 `http` 后端,`test_topic_zenoh` 配置了 `zenoh` 后端;
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.

# Get the current folder name
string(REGEX REPLACE ".*/\(.*\)" "\\1" CUR_DIR ${CMAKE_CURRENT_SOURCE_DIR})

# Get namespace
get_namespace(CUR_SUPERIOR_NAMESPACE)
string(REPLACE "::" "_" CUR_SUPERIOR_NAMESPACE_UNDERLINE ${CUR_SUPERIOR_NAMESPACE})

# Set target name
set(CUR_TARGET_NAME ${CUR_SUPERIOR_NAMESPACE_UNDERLINE}_${CUR_DIR})
set(CUR_TARGET_ALIAS_NAME ${CUR_SUPERIOR_NAMESPACE}::${CUR_DIR})

# Set file collection
file(GLOB_RECURSE src ${CMAKE_CURRENT_SOURCE_DIR}/*.cc)

# Add target
add_library(${CUR_TARGET_NAME} SHARED)
add_library(${CUR_TARGET_ALIAS_NAME} ALIAS ${CUR_TARGET_NAME})

# Set source file of target
target_sources(${CUR_TARGET_NAME} PRIVATE ${src})

# Set link libraries of target
target_link_libraries(
${CUR_TARGET_NAME}
PRIVATE aimrt::interface::aimrt_type_support_pkg_c_interface
aimrt::interface::aimrt_module_protobuf_interface
aimrt::protocols::example_pb_gencode
aimrt::interface::aimrt_module_ros2_interface
example_ros2::example_ros2__rosidl_generator_cpp
example_ros2::example_ros2__rosidl_typesupport_cpp)
# Set misc of target
set_target_properties(${CUR_TARGET_NAME} PROPERTIES OUTPUT_NAME ${CUR_DIR})
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2023, AgiBot Inc.
// All rights reserved.

#include "aimrt_type_support_pkg_c_interface/type_support_pkg_main.h"

#include "aimrt_module_protobuf_interface/util/protobuf_type_support.h"

#include "aimrt_module_ros2_interface/util/ros2_type_support.h"

#include "example_ros2/msg/ros_test_msg.hpp"

#include "event.pb.h"

static const aimrt_type_support_base_t* type_support_array[]{
aimrt::GetProtobufMessageTypeSupport<aimrt::protocols::example::ExampleEventMsg>(),
aimrt::GetRos2MessageTypeSupport<example_ros2::msg::RosTestMsg>()};

extern "C" {

size_t AimRTDynlibGetTypeSupportArrayLength() {
return sizeof(type_support_array) / sizeof(type_support_array[0]);
}

const aimrt_type_support_base_t** AimRTDynlibGetTypeSupportArray() {
return type_support_array;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.

aimrt:
plugin:
plugins:
- name: proxy_plugin
path: ./libaimrt_proxy_plugin.so
options:
type_support_pkgs:
- path: ./libexample_event_ts_pkg.so
proxy_actions:
- name: my_proxy
options:
executor: proxy_plugin_executor
topic_meta_list:
- sub_topic_name: test_topic_http
pub_topic_name: [test_topic_zenoh]
msg_type: pb:aimrt.protocols.example.ExampleEventMsg
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
- name: net_plugin
path: ./libaimrt_net_plugin.so
options:
thread_num: 4
http_options:
listen_ip: 127.0.0.1
listen_port: 50080
log:
core_lvl: Info # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console

executor:
executors:
- name: work_thread_pool
type: asio_thread
options:
thread_num: 4
- name: proxy_plugin_executor
type: simple_thread

channel:
backends:
- type: zenoh
- type: http
sub_topics_options:
- topic_name: test_topic_http
enable_backends: [http]
pub_topics_options:
- topic_name: test_topic_zenoh
enable_backends: [zenoh]
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.

aimrt:
plugin:
plugins:
- name: net_plugin
path: ./libaimrt_net_plugin.so
options:
thread_num: 4
http_options:
listen_ip: 127.0.0.1
listen_port: 50081
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: work_thread_pool
type: asio_thread
options:
thread_num: 2
channel:
backends:
- type: http
options:
pub_topics_options:
- topic_name: "(.*)"
server_url_list: ["127.0.0.1:50080"]
pub_topics_options:
- topic_name: "(.*)"
enable_backends: [http]
module:
pkgs:
- path: ./libpb_chn_pub_pkg.so
enable_modules: [NormalPublisherModule]
modules:
- name: NormalPublisherModule
log_lvl: INFO

# Module custom configuration
NormalPublisherModule:
topic_name: test_topic_http
channel_frq: 0.5
Loading

0 comments on commit 18d45db

Please sign in to comment.