Skip to content

Commit

Permalink
feat: zenoh plugin with shm (AimRT#96)
Browse files Browse the repository at this point in the history
* add zenoh-shm API

* use zenoh shm-api for channel and rpc

* set z_alloc_result as a local variable.

* add zenoh_buffer_array_allocator

* avoid copy opreation when pub data with shm

* if shm pool size is not enough, use net buffer instead

* add z_pub_shm_size_map_ to store topic-loan_size

* little fix

* remove client send data 's copying

* remove server send data 's copy

* add doc

* change benchamrk  item

* minor modification

---------

Co-authored-by: hanjun <hanjun@agibot.com>
  • Loading branch information
owny990312 and hanjun authored Nov 15, 2024
1 parent 5c0e63c commit 062ffad
Show file tree
Hide file tree
Showing 29 changed files with 1,378 additions and 97 deletions.
6 changes: 6 additions & 0 deletions cmake/GetZenoh.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ endif()
function(get_zenohc)
FetchContent_GetProperties(zenohc)
if(NOT zenohc_POPULATED)
set(ZENOHC_BUILD_WITH_UNSTABLE_API
TRUE
CACHE BOOL "Enable unstable API")
set(ZENOHC_BUILD_WITH_SHARED_MEMORY
TRUE
CACHE BOOL "Enable shared memory")
FetchContent_MakeAvailable(zenohc)
endif()
endfunction()
Expand Down
9 changes: 5 additions & 4 deletions document/sphinx-cn/release_notes/v0_9_0.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

**重要修改**
- 优化了 zenoh 插件:
- 更新 zenohc 库至 1.0.0.11 版本;
- 添加了 zenoh rpc 后端;
- 现在可以传入 zenoh 原生配置;
- zenoh 插件支持网络通信和共享内存两种通信方式;
- 可以传入 zenoh 原生配置进行更丰富的配置;
- 新增了第三方库 asio,runtime::core 不再引用 boost,改为引用独立的 asio 库,以减轻依赖;
- 新增 aimrt_cli trans 命令,用于将 使用 aimrt record_playback 插件录制的 bag 文件转换为 ros2 的 bag 文件;
- 新增 Echo 插件,用于回显消息;
Expand All @@ -27,6 +27,7 @@
- 支持日志自定义输出格式;
- 支持日志定期主动落盘操作;
- grpc 插件支持 ros2 消息以及 json 序列化格式;
- mqtt 新增配置项以支持 ssl/tls 加密传输;
- mqtt 插件在broker未启动时,会自动重试异步连接;
- mqtt 新增配置项以支持 ssl/tls 单向认证/双向认证的加密传输;
- mqtt 插件在broker未启动时,会自动重试异步连接, 并提供重连间隔配置项;
- ros2 插件支持自定义 rpc 服务名称;
- asio thread/strand 执行器现在支持是否使用 system clock;
56 changes: 45 additions & 11 deletions document/sphinx-cn/tutorials/plugins/zenoh_plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,26 @@
- `服务发现`机制的通信系统;
- 灵活的网络拓扑结构;
- 低延迟、高吞吐量的网络通信和数据传输;

- SHM 和 非 SHM 两种传输模式;
-
此插件为 AimRT 提供以下组件:
- `zenoh` 类型 Rpc 后端
- `zenoh` 类型 Channel 后端



插件的配置项如下:

| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| :-------------: | :----: | :------: | :----: | :-------------------------: |
| native_cfg_path | string | 可选 | "" | 使用zenoh提供的原生配置文件 |
| limit_domain | string | 可选 | "" | 对插件的通信域进行限制 |
| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| :----------------: | :----: | :------: | :----: | :-----------------------------: |
| shm_pool_size | int | 可选 | 10 MB | 共享内存池的大小 , 单位:B |
| shm_init_loan_size | int | 可选 | 1 KB | 共享内存初始借用大小 , 单位:B |
| native_cfg_path | string | 可选 | "" | 使用zenoh提供的原生配置文件 |
| limit_domain | string | 可选 | "" | 对插件的通信域进行限制 |


关于**zenoh_plugin**的配置,使用注意点如下:
- `shm_pool_size` 表示共享内存池的大小,单位:B,默认值是 10 MB,可以根据实际情况调整, 如果不使用共享内存,可忽略该配置项。 如果剩余共享内存不足以满足数据传输需求,则会自动切换到非共享内存传输模式 。
- `shm_init_loan_size` 表示向共享内存池初始借用大小,单位:B,默认值是 1 KB,可以根据实际情况调整, 如果不使用共享内存,可忽略该配置项。
- `native_cfg_path` 表示 zenoh 提供的原生配置文件的路径,可以通过配置该文件来灵活配置 zenoh 的网络结构。如果不填写则默认使用 zenoh 官方提供的默认配置,具体配置内容请参考 zenoh 官方关于[configuration](https://zenoh.io/docs/manual/configuration/)的说明,您也可以直接修改 {{ '[zenoh_native_config.json5]({}/src/examples/plugins/zenoh_plugin/install/linux/bin/cfg/zenoh_native_config.json5)'.format(code_site_root_path_url) }}文件来自定义配置,这里列举几个常用的配置项:

| 配置项 | 作用 | zenoh_native_config中的配置值 |
Expand Down Expand Up @@ -67,9 +72,17 @@ aimrt:
## zenoh 类型 Rpc 后端
`zenoh`类型的 Rpc后端是**zenoh_plugin**中提供的一种 Rpc 后端,主要用来构建请求-响应模型。其所有的配置项如下:

| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| ---------------- | ------ | -------- | ------ | ------------------------------------ |
| timeout_executor | string | 可选 | "" | Client 端发起 RPC 超时情况下的执行器 |
| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| ------------------------------ | ------ | -------- | ------ | ------------------------------------ |
| timeout_executor | string | 可选 | "" | Client 端发起 RPC 超时情况下的执行器 |
| clients_options | array | 可选 | [] | Client 端发起 RPC 请求时的规则 |
| clients_options[i].func_name | string | 必选 | "" | RPC Func 名称,支持正则表达式 |
| clients_options[i].shm_enabled | bool | 必选 | false | RPC Func 是否使用共享内存通信 |
| servers_options | array | 可选 | [] | 服务端处理 RPC 请求时的规则 |
| servers_options[i].func_name | string | 必选 | "" | RPC Func 名称,支持正则表达式 |
| servers_options[i].shm_enabled | bool | 必选 | false | RPC Func 是否使用共享内存通信 |

注意: zenoh 支持 SHM 和 非 SHM 的自动转换, 即如果数据离开其所在的SHM 域,则自动切换到非 SHM 通信。 例如,如果节点 A 和 节点 B 都设置的共享内存,但其不再同一机器上,仍可以进行通信,因为数据会自动切换到非共享内存的传输模式。

以下是一个简单的客户端的示例:

Expand All @@ -81,6 +94,7 @@ aimrt:
path: ./libaimrt_zenoh_plugin.so
options:
native_cfg_path: ./cfg/zenoh_native_config.json5
shm_pool_size: 10240
executor:
executors:
- name: timeout_handle
Expand All @@ -92,6 +106,9 @@ aimrt:
- type: zenoh
options:
timeout_executor: timeout_handle
clients_options:
- func_name: "(.*)"
shm_enabled: false
clients_options:
- func_name: "(.*)"
enable_backends: [zenoh]
Expand All @@ -112,6 +129,11 @@ aimrt:
rpc:
backends:
- type: zenoh
options:
timeout_executor: timeout_handle
servers_options:
- func_name: "(.*)"
shm_enabled: true
servers_options:
- func_name: "(.*)"
enable_backends: [zenoh]
Expand Down Expand Up @@ -175,7 +197,15 @@ Server -> Client 的 Zenoh 数据包格式整体分 4 段:

## zenoh 类型 Channel 后端

`zenoh`类型的 Channel 后端是**zenoh_plugin**中提供的一种Channel后端,主要用来构建发布和订阅模型。
`zenoh`类型的 Channel 后端是**zenoh_plugin**中提供的一种Channel后端,主要用来构建发布和订阅模型。其所有的配置项如下:

| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| --------------------------------- | ------ | -------- | ------ | ------------------------------- |
| pub_topics_options | array | 可选 | [] | 发布 Topic 时的规则 |
| pub_topics_options[i].topic_name | string | 必选 | "" | Topic 名称,支持正则表达式 |
| pub_topics_options[i].shm_enabled | bool | 必选 | false | Publish 端 是否使用共享内存通信 |

注意: zenoh 支持 SHM 和 非 SHM 的自动转换, 即如果数据离开其所在的SHM 域,则自动切换到非 SHM 通信。 例如,如果节点 A 和 节点 B 都设置的共享内存,但其不再同一机器上,仍可以进行通信,因为数据会自动切换到非共享内存的传输模式。

以下是一个简单的发布端的示例:
```yaml
Expand All @@ -185,10 +215,14 @@ aimrt:
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
options:
native_cfg_path: ./cfg/zenoh_native_config.json5
shm_pool_size: 1024
channel:
backends:
- type: zenoh
options:
pub_topics_options:
- topic_name: "(.*)"
shm_enabled: false
pub_topics_options:
- topic_name: "(.*)"
enable_backends: [zenoh]
Expand Down
114 changes: 113 additions & 1 deletion src/examples/plugins/zenoh_plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,116 @@


说明:
- 本示例与 **protobuf rpc** 示例基本一致,除了业务层使用的是 ros2 msg 形式的协议;
- 本示例与 **protobuf rpc** 示例基本一致,除了业务层使用的是 ros2 msg 形式的协议;

## protobuf channel with shared-memory

一个基于 protobuf 协议与 zenoh 后端的 channel 示例,演示内容包括:
- 如何在配置文件中加载**zenoh_plugin**
- 如何使用 zenoh 类型的 channel 后端;
- 如何使用共享内存进行通信;


核心代码:
- [event.proto](../../../protocols/example/event.proto)
- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc)
- [normal_subscriber_module.cc](../../cpp/pb_chn/module/normal_subscriber_module/normal_subscriber_module.cc)


配置文件:
- [examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml)
- [examples_plugins_zenoh_plugin_pb_chn_sub_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_sub_with_shm_cfg.yaml)


运行方式(linux):
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_ZENOH_PLUGIN` 选项编译 AimRT(编译时需要提前准备好rust编译环境);
- 在终端输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 未启动,共享内存应该为空;
- 编译成功后,先运行 build 目录下`start_examples_plugins_zenoh_plugin_pb_chn_sub_with_shm.sh`脚本启动订阅端(sub 进程);
- 再开启一个新的终端窗口运行`start_examples_plugins_zenoh_plugin_pb_chn_pub_with_shm.sh`脚本启动发布端(pub 进程);
- 程序启动后在终端再次输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 已启动,可观察到 zenoh 占用的共享内存大小信息;
- 分别在两个终端键入`ctrl-c`停止对应进程;


说明:
- 此示例创建了以下两个模块:
- `NormalPublisherModule`:会基于 `work_thread_pool` 执行器,以配置的频率、向配置的 topic 中发布 `ExampleEventMsg` 类型的消息;
- `NormalSubscriberModule`:会订阅配置的 topic 下的 `ExampleEventMsg` 类型的消息;
- 此示例将 `NormalPublisherModule``NormalSubscriberModule` 分别集成到 `pb_chn_pub_pkg``pb_chn_sub_pkg` 两个 Pkg 中,并在两个配置文件中分别加载对应的 Pkg 到 pub 和 sub 进程中;
- 此示例加载了**zenoh_plugin**,并使用 zenoh 类型的 channel 后端进行通信;


## protobuf rpc with shared-memory

一个基于 protobuf 协议、协程型接口与 zenoh 后端的 rpc 示例,演示内容包括:
- 如何在配置文件中加载**zenoh_plugin**
- 如何使用 zenoh 类型的 rpc 后端;
- 如何使用共享内存进行通信;

核心代码:
- [rpc.proto](../../../protocols/example/rpc.proto)
- [normal_rpc_co_client_module.cc](../../cpp/pb_rpc/module/normal_rpc_co_client_module/normal_rpc_co_client_module.cc)
- [normal_rpc_co_server_module.cc](../../cpp/pb_rpc/module/normal_rpc_co_server_module/normal_rpc_co_server_module.cc)
- [service.cc](../../cpp/pb_rpc/module/normal_rpc_co_server_module/service.cc)


配置文件:
- [examples_plugins_zenoh_plugin_pb_rpc_client_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_client_with_shm_cfg.yaml)
- [examples_plugins_zenoh_plugin_pb_rpc_server_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_rpc_server_with_shm_cfg.yaml)


运行方式(linux):
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_ZENOH_PLUGIN` 选项编译 AimRT(编译时需要提前准备好rust编译环境);
- 在终端输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 未启动,共享内存应该为空;
- 编译成功后,先运行 build 目录下`start_examples_plugins_zenoh_plugin_pb_rpc_server_with_shm.sh`脚本启动订阅端(srv 进程);
- 再开启一个新的终端窗口运行`start_examples_plugins_zenoh_plugin_pb_rpc_client_with_shm.sh`脚本启动发布端(cli 进程);
- 程序启动后在终端再次输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 已启动,可观察到 zenoh 占用的共享内存大小信息;
- 分别在两个终端键入`ctrl-c`停止对应进程;

说明:
- 此示例创建了以下两个模块:
- `NormalRpcCoClientModule`:会基于 `work_thread_pool` 执行器,以配置的频率,通过协程 Client 接口,向 `ExampleService` 发起 RPC 请求;
- `NormalRpcCoServerModule`:会注册 `ExampleService` 服务端,通过协程 Server 接口,提供 echo 功能;
- 此示例在 Rpc Client 端和 Server 端分别注册了两个 Filter 用于打印请求日志和计算耗时;
- 此示例将 `NormalRpcCoClientModule``NormalRpcCoServerModule` 分别集成到 `pb_rpc_client_pkg``pb_rpc_server_pkg` 两个 Pkg 中,并在两个配置文件中分别加载对应的 Pkg 到 srv 和 cli 进程中;
- 此示例加载了**zenoh_plugin**,并使用 zenoh 类型的 rpc 后端进行通信,此外还在客户端配置了 `timeout_handle` 执行器作为超时执行器;


## protobuf channel with shared-memory and network

一个基于 protobuf 协议与 zenoh 后端的 channel 示例,演示内容包括:
- 如何在配置文件中加载**zenoh_plugin**
- 如何使用 zenoh 类型的 channel 后端;
- 如何使用共享内存和网络进行混合通信;


核心代码:
- [event.proto](../../../protocols/example/event.proto)
- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc)
- [normal_subscriber_module.cc](../../cpp/pb_chn/module/normal_subscriber_module/normal_subscriber_module.cc)


配置文件:
- [examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_pub_with_shm_cfg.yaml)
- [examples_plugins_zenoh_plugin_pb_chn_sub_cfg.yaml](./install/linux/bin/cfg/examples_plugins_zenoh_plugin_pb_chn_sub_cfg.yaml)


运行方式(linux):
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_ZENOH_PLUGIN` 选项编译 AimRT(编译时需要提前准备好rust编译环境);
- 在终端输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 未启动,共享内存应该为空;
- 编译成功后,先运行 build 目录下`start_examples_plugins_zenoh_plugin_pb_chn_sub.sh`脚本启动订阅端(sub 进程);
- 再开启一个新的终端窗口运行`start_examples_plugins_zenoh_plugin_pb_chn_pub_with_shm.sh`脚本启动发布端(pub 进程);
- 程序启动后在终端再次输入 `ls -l /dev/shm` 值令 观察此时计算机上共享内存的使用情况, 此时zenoh 已启动,可观察到 zenoh 占用的共享内存大小信息;
- 分别在两个终端键入`ctrl-c`停止对应进程;


说明:
- 此示例创建了以下两个模块:
- `NormalPublisherModule`:会基于 `work_thread_pool` 执行器,以配置的频率、向配置的 topic 中发布 `ExampleEventMsg` 类型的消息;
- `NormalSubscriberModule`:会订阅配置的 topic 下的 `ExampleEventMsg` 类型的消息;
- 此示例将 `NormalPublisherModule``NormalSubscriberModule` 分别集成到 `pb_chn_pub_pkg``pb_chn_sub_pkg` 两个 Pkg 中,并在两个配置文件中分别加载对应的 Pkg 到 pub 和 sub 进程中;
- 此示例加载了**zenoh_plugin**,并使用 zenoh 类型的 channel 后端进行通信;
- zenoh 插件配置的共享内存数据一旦超过其作用范围,就会自动切换到网络传输模式。 用户可尝试分别在一台主机和两台主机分别执行上述步骤,观察到不同主机间的通信效果:
- 在一台主机上运行的发布端和订阅端,可观察到共享内存的通信效果;
- 在两台主机上运行的发布端和订阅端,可观察到网络的通信效果, 尽管发布端使用的共享内存方案;


Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.

aimrt:
plugin:
plugins:
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
options:
native_cfg_path: ./cfg/zenoh_native_config.json5
shm_pool_size: 10240000
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: publish_control_executor
type: simple_thread
- name: publish_executor_0
type: simple_thread
- name: publish_executor_1
type: simple_thread
- name: publish_executor_2
type: simple_thread
- name: publish_executor_3
type: simple_thread
channel:
backends:
- type: zenoh
options:
pub_topics_options:
- topic_name: "(.*)"
shm_enabled: true
pub_topics_options:
- topic_name: "(.*)"
enable_backends: [zenoh]
module:
pkgs:
- path: ./libpb_chn_pub_pkg.so
enable_modules: [BenchmarkPublisherModule]
modules:
- name: BenchmarkPublisherModule
log_lvl: INFO

# Module custom configuration
BenchmarkPublisherModule:
max_topic_number: 4
bench_plans:
- channel_frq: 1000
msg_size: 512
topic_number: 4
msg_count: 5000
- channel_frq: 1000
msg_size: 4096
topic_number: 1
msg_count: 1000
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.

aimrt:
plugin:
plugins:
- name: zenoh_plugin
path: ./libaimrt_zenoh_plugin.so
options:
native_cfg_path: ./cfg/zenoh_native_config.json5
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
channel:
backends:
- type: zenoh
sub_topics_options:
- topic_name: "(.*)"
enable_backends: [zenoh]
module:
pkgs:
- path: ./libpb_chn_sub_pkg.so
enable_modules: [BenchmarkSubscriberModule]
modules:
- name: BenchmarkSubscriberModule
log_lvl: INFO

# Module custom configuration
BenchmarkSubscriberModule:
max_topic_number: 4
Loading

0 comments on commit 062ffad

Please sign in to comment.