Skip to content

Commit

Permalink
feat(node_framework): Add timeouts for remaining tasks to avoid hang …
Browse files Browse the repository at this point in the history
…outs (matter-labs#1354)

## What ❔
We need to wrap every task future into tokio timeout when we are
shutting down.

## Why ❔
Right now, the tasks are awaited, and if some task doesn't check the
stop signal, it can hang the node.


## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
AnastasiiaVashchuk authored Mar 7, 2024
1 parent 1e431a6 commit 8108dbd
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions core/node/node_framework/src/service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, fmt};
use std::{collections::HashMap, fmt, time::Duration};

use futures::{future::BoxFuture, FutureExt};
use tokio::{runtime::Runtime, sync::watch};
Expand All @@ -15,6 +15,9 @@ mod stop_receiver;
#[cfg(test)]
mod tests;

// A reasonable amount of time for any task to finish the shutdown process
const TASK_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);

/// "Manager" class for a set of tasks. Collects all the resources and tasks,
/// then runs tasks until completion.
///
Expand Down Expand Up @@ -145,30 +148,44 @@ impl ZkStackService {
.collect();

// Run the tasks until one of them exits.
// TODO (QIT-24): wrap every task into a timeout to prevent hanging.
let (resolved, idx, remaining) = self
let (resolved, resolved_idx, remaining) = self
.runtime
.block_on(futures::future::select_all(join_handles));
let task_name = tasks[idx].name.clone();
let resolved_task_name = tasks[resolved_idx].name.clone();
let failure = match resolved {
Ok(Ok(())) => {
tracing::info!("Task {task_name} completed");
tracing::info!("Task {resolved_task_name} completed");
false
}
Ok(Err(err)) => {
tracing::error!("Task {task_name} exited with an error: {err}");
tracing::error!("Task {resolved_task_name} exited with an error: {err}");
true
}
Err(_) => {
tracing::error!("Task {task_name} panicked");
tracing::error!("Task {resolved_task_name} panicked");
true
}
};

let remaining_tasks_with_timeout: Vec<_> = remaining
.into_iter()
.map(|task| async { tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, task).await })
.collect();

// Send stop signal to remaining tasks and wait for them to finish.
// Given that we are shutting down, we do not really care about returned values.
self.stop_sender.send(true).ok();
self.runtime.block_on(futures::future::join_all(remaining));
let execution_results = self
.runtime
.block_on(futures::future::join_all(remaining_tasks_with_timeout));
let execution_timeouts_count = execution_results.iter().filter(|&r| r.is_err()).count();
if execution_timeouts_count > 0 {
tracing::warn!(
"{execution_timeouts_count} tasks didn't finish in {TASK_SHUTDOWN_TIMEOUT:?} and were dropped"
);
} else {
tracing::info!("Remaining tasks finished without reaching timeouts");
}

// Call after_node_shutdown hooks.
let local_set = tokio::task::LocalSet::new();
Expand All @@ -180,7 +197,7 @@ impl ZkStackService {
local_set.block_on(&self.runtime, futures::future::join_all(join_handles));

if failure {
anyhow::bail!("Task {task_name} failed");
anyhow::bail!("Task {resolved_task_name} failed");
} else {
Ok(())
}
Expand Down

0 comments on commit 8108dbd

Please sign in to comment.