Skip to content

Commit

Permalink
feat: Asynchronous connection to mqtt broker (AimRT#85)
Browse files Browse the repository at this point in the history
* feat: Asynchronous connection to mqtt  broker

* add new option: client_key_password

* add example for mqtt plugin with ssl/tls

* add new info at release notes

* Make minor formatting adjustments.

* Improve  mqtt_plugin's README documentation

---------

Co-authored-by: hanjun <hanjun@agibot.com>
  • Loading branch information
owny990312 and hanjun authored Nov 8, 2024
1 parent e013333 commit 5b01f4e
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 50 deletions.
3 changes: 2 additions & 1 deletion document/sphinx-cn/release_notes/v0_9_0.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
- 更新 zenohc 库至 1.0.0.11 版本;
- 添加了 zenoh rpc 后端;
- 现在可以传入 zenoh 原生配置;
- mqtt 新增配置项以支持加密传输;
- 新增了第三方库 asio,runtime::core 不再引用 boost,改为引用独立的 asio 库,以减轻依赖;
- 新增 aimrt_cli trans 命令,用于将 使用 aimrt record_playback 插件录制的 bag 文件转换为 ros2 的 bag 文件;
- 新增 Echo 插件,用于回显消息;
Expand All @@ -27,3 +26,5 @@
- 删除一些未使用的协议;
- 支持日志自定义输出格式;
- grpc 插件支持 ros2 消息以及 json 序列化格式;
- mqtt 新增配置项以支持 ssl/tls 加密传输;
- mqtt 插件在broker未启动时,会自动重试异步连接;
21 changes: 13 additions & 8 deletions document/sphinx-cn/tutorials/plugins/mqtt_plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@

插件的配置项如下:

| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| -------------- | ------ | -------- | ------ | ----------------------- |
| broker_addr | string | 必选 | "" | mqtt broker 的地址 |
| client_id | string | 必选 | "" | 本节点的 mqtt client id |
| max_pkg_size_k | int | 可选 | 1024 | 最大包尺寸,单位:KB |
| truststore | string | 可选 | "" | broker的CA证书路径 |

| 节点 | 类型 | 是否可选 | 默认值 | 作用 |
| ------------------- | ------ | -------- | ------ | ----------------------- |
| broker_addr | string | 必选 | "" | mqtt broker 的地址 |
| client_id | string | 必选 | "" | 本节点的 mqtt client id |
| max_pkg_size_k | int | 可选 | 1024 | 最大包尺寸,单位:KB |
| truststore | string | 可选 | "" | CA证书路径 |
| client_cert | string | 可选 | "" | 客户端证书路径 |
| client_key | string | 可选 | "" | 客户端私钥路径 |
| client_key_password | string | 可选 | "" | 客户端私钥设置的密码 |

关于**mqtt_plugin**的配置,使用注意点如下:
- `broker_addr`表示 mqtt broker 的地址,使用者必须保证有 mqtt 的 broker 运行在该地址,否则启动会失败。
- `client_id`表示本节点连接 mqtt broker 时的 client id。
- `max_pkg_size_k`表示传输数据时的最大包尺寸,默认 1 MB。注意,必须 broker 也要支持该尺寸才行。
- `truststore`表示 broker 的 CA 证书路径,例如`/etc/emqx/certs/cacert.pem` 。当`broker_addr`的协议被配置为`ssl`或者`mqtts`时,该选项生效,用于指定 CA 证书路径,否则自动忽略该选项。
- `truststore`表示 broker 的 CA 证书路径,例如`/etc/emqx/certs/cacert.pem` 。当`broker_addr`的协议被配置为`ssl`或者`mqtts`时,该选项生效,用于指定 CA 证书路径,否则自动忽略该选项, 请注意若只配置该选项则视为单向认证。
- `client_cert`表示客户端证书路径,例如`/etc/emqx/certs/client-cert.pem`。当需要双向认证时使用,与`client_key`配合使用。如果 broker_addr 使用非加密协议,该选项将被忽略。
- `client_key`表示客户端私钥路径,例如`/etc/emqx/certs/client-key.pem`。当需要双向认证时使用,与`client_cert`配合使用。如果 broker_addr 使用非加密协议,该选项将被忽略。
- `client_key_password`表示客户端私钥设置的密码,如果私钥设置了密码,则需要设置该选项。如果 broker_addr 使用非加密协议,该选项将被忽略。

**mqtt_plugin**插件基于[paho.mqtt.c](https://github.com/eclipse/paho.mqtt.c)封装,在使用时,Channel 订阅回调、RPC Server 处理方法、RPC Client 返回时,使用的都是**paho.mqtt.c**提供的线程,当使用者在回调中阻塞了线程时,有可能导致无法继续接收/发送消息。正如 Module 接口文档中所述,一般来说,如果回调中的任务非常轻量,那就可以直接在回调里处理;但如果回调中的任务比较重,那最好调度到其他专门执行任务的执行器里处理。

Expand Down
77 changes: 77 additions & 0 deletions src/examples/plugins/mqtt_plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,80 @@

说明:
- 本示例与 **protobuf rpc** 示例基本一致,除了业务层使用的是 ros2 msg 形式的协议;


## protobuf channel with SSL/TLS

一个基于 protobuf 协议与 mqtt 后端的 channel 示例,演示内容包括:
- 如何在配置文件中加载**mqtt_plugin**
- 如何使用 mqtt 类型的 channel 后端;
- 如何配置 SSL/TLS 加密;



核心代码:
- [event.proto](../../../protocols/example/event.proto)
- [normal_publisher_module.cc](../../cpp/pb_chn/module/normal_publisher_module/normal_publisher_module.cc)
- [normal_subscriber_module.cc](../../cpp/pb_chn/module/normal_subscriber_module/normal_subscriber_module.cc)


配置文件:
- [examples_plugins_mqtt_plugin_pb_chn_pub_with_ssl_cfg.yaml](./install/linux/bin/cfg/examples_plugins_mqtt_plugin_pb_chn_pub_with_ssl_cfg.yaml)
- [examples_plugins_mqtt_plugin_pb_chn_sub_with_ssl_cfg.yaml](./install/linux/bin/cfg/examples_plugins_mqtt_plugin_pb_chn_sub_with_ssl_cfg.yaml)

运行方式(linux):
- 开启 `AIMRT_BUILD_EXAMPLES``AIMRT_BUILD_MQTT_PLUGIN` 选项编译 AimRT;
- 生成加密通信所需的证书和密钥文件(主要需要的是ca_crt.pem、server_crt.pem、server_key.pem、client_crt.pem、client_key.pem 这五个文件),以下是一个简单的生成示例:

```shell
# 生成 CA 私钥
openssl genpkey -algorithm RSA -out ca_key.pem

# 生成 CA 自签名证书
openssl req -x509 -new -key ca_key.pem -sha256 -days 3650 -out ca_crt.pem
```

```shell
# 生成服务器私钥
openssl genpkey -algorithm RSA -out server_key.pem

# 生成服务器证书签名请求 (CSR)
openssl req -new -key server_key.pem -out server_csr.pem

# 使用 CA 签署服务器证书
openssl x509 -req -in server_csr.pem -CA ca_crt.pem -CAkey ca_key.pem -CAcreateserial -out server_crt.pem -days 365 -sha256
```

```shell
# 生成客户端私钥
openssl genpkey -algorithm RSA -out client_key.pem

# 生成客户端证书签名请求 (CSR)
openssl req -new -key client_key.pem -out client_csr.pem

# 使用 CA 签署客户端证书
openssl x509 -req -in client_csr.pem -CA ca_crt.pem -CAkey ca_key.pem -CAcreateserial -out client_crt.pem -days 365 -sha256
```


```shell
# [可选] 使用 OpenSSL 的 openssl pkcs8 命令将 client 的私钥加密
openssl pkcs8 -topk8 -inform PEM -outform PEM -in client_key.pem -out client_key_encrypted.pem -v2 aes-256-cbc
```

- 将生成的 `ca_crt.pem``server_crt.pem``server_key.pem` 文件的路径复制到 broker 配置文件中对应的位置, 并配置地址(默认 0.0.0.0:8883),以及是否需要双向认证(默认不开启);
- 将生成的 `ca_crt.pem``client_crt.pem``client_key.pem` 文件的路径依次复制到客户端配置文件中对应的 `truststore``client_cert``client_key`, 如果设置客户端私钥文件被加密,则将设置的密码配置在`client_key_password`中;
- 在本地启动一个 mqtt broker,也可以使用其他 IP 地址的 mqtt broker,但需要修改示例配置中的 `broker_addr`
- 在终端运行 build 目录下 `start_examples_plugins_mqtt_plugin_pb_chn_sub_with_ssl.sh` 脚本启动订阅端( sub 进程);
- 再开启一个新的终端窗口运行 `start_examples_plugins_mqtt_plugin_pb_chn_pub_with_ssl.sh` 脚本启动发布端( pub 进程);
- 分别在两个终端键入 `ctrl-c`停止对应进程;


说明:
- 此示例创建了以下两个模块:
- `NormalPublisherModule`:会基于 `work_thread_pool` 执行器,以配置的频率、向配置的 topic 中发布 `ExampleEventMsg` 类型的消息;
- `NormalSubscriberModule`:会订阅配置的 topic 下的 `ExampleEventMsg` 类型的消息;
- 此示例将 `NormalPublisherModule``NormalSubscriberModule` 分别集成到 `pb_chn_pub_pkg``pb_chn_sub_pkg` 两个 Pkg 中,并在两个配置文件中分别加载对应的 Pkg 到 pub 和 sub 进程中;
- 此示例加载了**mqtt_plugin**,并使用 mqtt 类型的 channel 后端进行通信,配置 `ssl://127.0.0.1:8883` 作为 broker 地址, 默认该端口用于 SSL/TLS 加密通信;

- `直接运行给定的示例不能正常工作,需要修改配置文件中证书和密钥的路径为实际生成位置`
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.

aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: ssl://127.0.0.1:8883
client_id: example_pb_chn_publisher
max_pkg_size_k: 1024
truststore: /XX/YY/ZZ/cacert.pem # replace with your own truststore path
client_cert: /XX/YY/ZZ/client-cert.pem # replace with your own client certificate path
client_key: /XX/YY/ZZ/client-key.pem # replace with your own client key path
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
executors:
- name: work_thread_pool
type: asio_thread
options:
thread_num: 2
channel:
backends:
- type: mqtt
pub_topics_options:
- topic_name: "(.*)"
enable_backends: [mqtt]
module:
pkgs:
- path: ./libpb_chn_pub_pkg.so
enable_modules: [NormalPublisherModule]
modules:
- name: NormalPublisherModule
log_lvl: INFO

# Module custom configuration
NormalPublisherModule:
topic_name: test_topic
channel_frq: 0.5
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.

aimrt:
plugin:
plugins:
- name: mqtt_plugin
path: ./libaimrt_mqtt_plugin.so
options:
broker_addr: ssl://127.0.0.1:8883
client_id: example_pb_chn_subscriber
max_pkg_size_k: 1024
truststore: /XX/YY/ZZ/cacert.pem # replace with your own truststore path
client_cert: /XX/YY/ZZ/client-cert.pem # replace with your own client certificate path
client_key: /XX/YY/ZZ/client-key.pem # replace with your own client key path
log:
core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Off
backends:
- type: console
executor:
channel:
backends:
- type: mqtt
sub_topics_options:
- topic_name: "(.*)"
enable_backends: [mqtt]
module:
pkgs:
- path: ./libpb_chn_sub_pkg.so
enable_modules: [NormalSubscriberModule]
modules:
- name: NormalSubscriberModule
log_lvl: INFO

# Module custom configuration
NormalSubscriberModule:
topic_name: test_topic
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

./aimrt_main --cfg_file_path=./cfg/examples_plugins_mqtt_plugin_pb_chn_pub_with_ssl_cfg.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

./aimrt_main --cfg_file_path=./cfg/examples_plugins_mqtt_plugin_pb_chn_sub_with_ssl_cfg.yaml
2 changes: 0 additions & 2 deletions src/plugins/mqtt_plugin/mqtt_channel_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ void MqttChannelBackend::Start() {
AIMRT_CHECK_ERROR_THROW(
std::atomic_exchange(&state_, State::kStart) == State::kInit,
"Method can only be called when state is 'Init'.");

SubscribeMqttTopic();
}

void MqttChannelBackend::Shutdown() {
Expand Down
Loading

0 comments on commit 5b01f4e

Please sign in to comment.