Skip to content

Commit

Permalink
fix: asio thread executor (AimRT#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
wtudio authored Dec 14, 2024
1 parent 16c922f commit 0c7686b
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 56 deletions.
3 changes: 0 additions & 3 deletions document/sphinx-cn/tutorials/cfg/executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,13 @@ aimrt:
| thread_sched_policy | string | 可选 | "" | 线程调度策略 |
| thread_bind_cpu | unsigned int array | 可选 | [] | 绑核配置 |
| timeout_alarm_threshold_us | unsigned int | 可选 | 1000000 | 调度超时告警阈值,单位:微秒 |
| queue_threshold | unsigned int | 可选 | 10000 | 队列任务上限 |
| use_system_clock | bool | 可选 | false | 是否使用 std::system_clock,默认使用 std::steady_clock |


使用注意点如下:
- `thread_num`配置了线程数,默认为 1。当线程数配置为 1 时为线程安全执行器,否则是线程不安全的。
- `thread_sched_policy`和`thread_bind_cpu`参考[Common Information](./common.md)中线程绑核配置的说明。
- `timeout_alarm_threshold_us`配置了一个调度超时告警的阈值。当进行定时调度时,如果 CPU 负载太重、或队列中任务太多,导致超过设定的时间才调度到,则会打印一个告警日志。
- `queue_threshold`配置了队列任务上限,当已经有超过此阈值的任务在队列中时,新任务将投递失败。
- `use_system_clock`配置是否使用 std::system_clock 作为时间系统,默认为 false,使用 std::steady_clock。注意使用 std::system_clock 时,执行器的时间将与系统同步,可能会受到外部调节。

以下是一个简单的示例:
Expand All @@ -97,7 +95,6 @@ aimrt:
thread_sched_policy: SCHED_FIFO:80
thread_bind_cpu: [0, 1]
timeout_alarm_threshold_us: 1000
queue_threshold: 10000
use_system_clock: false
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ aimrt:
options:
thread_num: 2
- name: thread_safe_executor
type: asio_thread
type: asio_strand
options:
thread_num: 1
bind_asio_thread_executor_name: work_executor
- name: time_schedule_executor
type: asio_thread
options:
Expand Down
18 changes: 16 additions & 2 deletions src/runtime/core/executor/asio_strand_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,17 @@ void AsioStrandExecutor::Shutdown() {
}

void AsioStrandExecutor::Execute(aimrt::executor::Task&& task) noexcept {
if (state_.load() != State::kInit && state_.load() != State::kStart) [[unlikely]] {
fprintf(stderr,
"Asio strand executor '%s' can only execute task when state is 'Init' or 'Start'.\n",
name_.c_str());
return;
}

try {
asio::post(*strand_ptr_, std::move(task));
} catch (const std::exception& e) {
AIMRT_ERROR("{}", e.what());
fprintf(stderr, "Asio strand executor '%s' execute Task get exception: %s\n", name_.c_str(), e.what());
}
}

Expand All @@ -103,6 +110,13 @@ std::chrono::system_clock::time_point AsioStrandExecutor::Now() const noexcept {

void AsioStrandExecutor::ExecuteAt(
std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept {
if (state_.load() != State::kInit && state_.load() != State::kStart) [[unlikely]] {
fprintf(stderr,
"Asio strand executor '%s' can only execute task when state is 'Init' or 'Start'.\n",
name_.c_str());
return;
}

try {
if (!options_.use_system_clock) {
auto timer_ptr = std::make_shared<asio::steady_timer>(*strand_ptr_);
Expand Down Expand Up @@ -150,7 +164,7 @@ void AsioStrandExecutor::ExecuteAt(
});
}
} catch (const std::exception& e) {
AIMRT_ERROR("{}", e.what());
fprintf(stderr, "Asio strand executor '%s' execute Task get exception: %s\n", name_.c_str(), e.what());
}
}

Expand Down
42 changes: 1 addition & 41 deletions src/runtime/core/executor/asio_thread_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ struct convert<aimrt::runtime::core::executor::AsioThreadExecutor::Options> {
std::chrono::duration_cast<std::chrono::microseconds>(
rhs.timeout_alarm_threshold_us)
.count());
node["queue_threshold"] = rhs.queue_threshold;
node["use_system_clock"] = rhs.use_system_clock;

return node;
Expand All @@ -36,8 +35,6 @@ struct convert<aimrt::runtime::core::executor::AsioThreadExecutor::Options> {
if (node["timeout_alarm_threshold_us"])
rhs.timeout_alarm_threshold_us = std::chrono::microseconds(
node["timeout_alarm_threshold_us"].as<uint64_t>());
if (node["queue_threshold"])
rhs.queue_threshold = node["queue_threshold"].as<uint32_t>();
if (node["use_system_clock"])
rhs.use_system_clock = node["use_system_clock"].as<bool>();

Expand All @@ -62,9 +59,6 @@ void AsioThreadExecutor::Initialize(std::string_view name,
start_sys_tp_ = std::chrono::system_clock::now();
start_std_tp_ = std::chrono::steady_clock::now();

queue_threshold_ = options_.queue_threshold;
queue_warn_threshold_ = queue_threshold_ * 0.95;

AIMRT_CHECK_ERROR_THROW(
options_.thread_num > 0,
"Invalide asio thread executor options, thread num is zero.");
Expand Down Expand Up @@ -94,9 +88,7 @@ void AsioThreadExecutor::Initialize(std::string_view name,
}

try {
while (io_ptr_->run_one()) {
--queue_task_num_;
}
io_ptr_->run();
} catch (const std::exception& e) {
AIMRT_FATAL("Asio thread executor '{}' run loop get exception, {}",
Name(), e.what());
Expand Down Expand Up @@ -145,22 +137,6 @@ void AsioThreadExecutor::Execute(aimrt::executor::Task&& task) noexcept {
return;
}

uint32_t cur_queue_task_num = ++queue_task_num_;

if (cur_queue_task_num > queue_threshold_) [[unlikely]] {
fprintf(stderr,
"The number of tasks in the asio thread executor '%s' has reached the threshold '%u', the task will not be delivered.\n",
name_.c_str(), queue_threshold_);
--queue_task_num_;
return;
}

if (cur_queue_task_num > queue_warn_threshold_) [[unlikely]] {
fprintf(stderr,
"The number of tasks in the asio thread executor '%s' is about to reach the threshold: '%u / %u'.\n",
name_.c_str(), cur_queue_task_num, queue_threshold_);
}

try {
asio::post(*io_ptr_, std::move(task));
} catch (const std::exception& e) {
Expand All @@ -187,22 +163,6 @@ void AsioThreadExecutor::ExecuteAt(
return;
}

uint32_t cur_queue_task_num = ++queue_task_num_;

if (cur_queue_task_num > queue_threshold_) [[unlikely]] {
fprintf(stderr,
"The number of tasks in the asio thread executor '%s' has reached the threshold '%u', the task will not be delivered.\n",
name_.c_str(), queue_threshold_);
--queue_task_num_;
return;
}

if (cur_queue_task_num > queue_warn_threshold_) [[unlikely]] {
fprintf(stderr,
"The number of tasks in the asio thread executor '%s' is about to reach the threshold: '%u / %u'.\n",
name_.c_str(), cur_queue_task_num, queue_threshold_);
}

try {
if (!options_.use_system_clock) {
auto timer_ptr = std::make_shared<asio::steady_timer>(*io_ptr_);
Expand Down
7 changes: 0 additions & 7 deletions src/runtime/core/executor/asio_thread_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class AsioThreadExecutor : public ExecutorBase {
std::string thread_sched_policy;
std::vector<uint32_t> thread_bind_cpu;
std::chrono::nanoseconds timeout_alarm_threshold_us = std::chrono::seconds(1);
uint32_t queue_threshold = 10000;
bool use_system_clock = false;
};

Expand Down Expand Up @@ -57,8 +56,6 @@ class AsioThreadExecutor : public ExecutorBase {
std::chrono::system_clock::time_point Now() const noexcept override;
void ExecuteAt(std::chrono::system_clock::time_point tp, aimrt::executor::Task&& task) noexcept override;

size_t CurrentTaskNum() noexcept override { return queue_task_num_.load(); }

State GetState() const { return state_.load(); }

void SetLogger(const std::shared_ptr<aimrt::common::util::LoggerWrapper>& logger_ptr) { logger_ptr_ = logger_ptr; }
Expand All @@ -75,10 +72,6 @@ class AsioThreadExecutor : public ExecutorBase {
std::chrono::system_clock::time_point start_sys_tp_;
std::chrono::steady_clock::time_point start_std_tp_;

uint32_t queue_threshold_;
uint32_t queue_warn_threshold_;
std::atomic_uint32_t queue_task_num_ = 0;

std::unique_ptr<asio::io_context> io_ptr_;
std::unique_ptr<
asio::executor_work_guard<asio::io_context::executor_type>>
Expand Down
4 changes: 3 additions & 1 deletion src/runtime/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ target_link_libraries(
aimrt::runtime::core)

if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")
set_target_properties(${CUR_TARGET_NAME} PROPERTIES LINK_FLAGS "-s")
if(CMAKE_BUILD_TYPE STREQUAL "Release")
set_target_properties(${CUR_TARGET_NAME} PROPERTIES LINK_FLAGS "-s")
endif()
endif()

# Add -Werror option
Expand Down

0 comments on commit 0c7686b

Please sign in to comment.