Skip to content

Commit

Permalink
fix: MetaGrpcClient deadlock when drop
Browse files Browse the repository at this point in the history
Move the reference to the dedicated runtime `rt` from `MetaGrpcClient`
to `ClientHandle`.

`rt` is a reference to the dedicated runtime for running
`MetaGrpcClient`.

If all ClientHandle are dropped, the runtime will be destroyed.

This `rt` previously is stored in `MetaGrpcClient`, which leads to a deadlock:
- When all `ClientHandle` are dropped, the two workers `worker_loop()` and `auto_sync_interval()`
  will quit.
- These two futures both held a reference to `MetaGrpcClient`.
- The last of these(say, `F`) two will drop `MetaGrpcClient.rt` and `Runtime::_dropper`
  will block waiting for the runtime to shut down.
- But `F` is still held, deadlock occurs.

Other changes:

- `Runtime::try_spawn` and several other spawn methods now accept a name
  argument for display in async backtrace.

- Add async-backtrace to `MetaGrpcClient` methods
  • Loading branch information
drmingdrmer committed Oct 30, 2024
1 parent ee8fb25 commit cab7489
Show file tree
Hide file tree
Showing 12 changed files with 329 additions and 199 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/base/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub use runtime::execute_futures_in_parallel;
pub use runtime::spawn;
pub use runtime::spawn_blocking;
pub use runtime::spawn_local;
pub use runtime::spawn_named;
pub use runtime::try_block_on;
pub use runtime::try_spawn_blocking;
pub use runtime::Dropper;
Expand Down
95 changes: 73 additions & 22 deletions src/common/base/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ use databend_common_exception::Result;
use databend_common_exception::ResultExt;
use futures::future;
use futures::FutureExt;
use log::info;
use log::warn;
use tokio::runtime::Builder;
use tokio::runtime::Handle;
use tokio::sync::oneshot;
use tokio::sync::OwnedSemaphorePermit;
use tokio::sync::Semaphore;

// use tokio::task::JoinHandle;
use crate::runtime::catch_unwind::CatchUnwindFuture;
use crate::runtime::drop_guard;
use crate::runtime::memory::MemStat;
Expand Down Expand Up @@ -88,7 +88,7 @@ pub trait TrySpawn {
///
/// It allows to return an error before spawning the task.
#[track_caller]
fn try_spawn<T>(&self, task: T) -> Result<JoinHandle<T::Output>>
fn try_spawn<T>(&self, task: T, name: Option<String>) -> Result<JoinHandle<T::Output>>
where
T: Future + Send + 'static,
T::Output: Send + 'static;
Expand All @@ -102,18 +102,18 @@ pub trait TrySpawn {
T: Future + Send + 'static,
T::Output: Send + 'static,
{
self.try_spawn(task).unwrap()
self.try_spawn(task, None).unwrap()
}
}

impl<S: TrySpawn> TrySpawn for Arc<S> {
#[track_caller]
fn try_spawn<T>(&self, task: T) -> Result<JoinHandle<T::Output>>
fn try_spawn<T>(&self, task: T, name: Option<String>) -> Result<JoinHandle<T::Output>>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
self.as_ref().try_spawn(task)
self.as_ref().try_spawn(task, name)
}

#[track_caller]
Expand Down Expand Up @@ -149,10 +149,14 @@ impl Runtime {

let handle = runtime.handle().clone();

let n = name.clone();
// Block the runtime to shutdown.
let join_handler = Thread::spawn(move || {
// We ignore channel is closed.
let _ = runtime.block_on(recv_stop);
let _ = recv_stop.blocking_recv();
info!(
"Runtime({:?}) received shutdown signal, start to shut down",
n
);

match !cfg!(debug_assertions) {
true => false,
Expand Down Expand Up @@ -257,7 +261,11 @@ impl Runtime {
#[allow(clippy::disallowed_methods)]
tokio::task::block_in_place(|| {
self.handle
.block_on(location_future(future, std::panic::Location::caller()))
.block_on(location_future(
future,
std::panic::Location::caller(),
None,
))
.with_context(|| "failed to block on future".to_string())
.flatten()
})
Expand Down Expand Up @@ -348,20 +356,28 @@ impl Runtime {

impl TrySpawn for Runtime {
#[track_caller]
fn try_spawn<T>(&self, task: T) -> Result<JoinHandle<T::Output>>
fn try_spawn<T>(&self, task: T, name: Option<String>) -> Result<JoinHandle<T::Output>>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let task = ThreadTracker::tracking_future(task);
let task = match ThreadTracker::query_id() {
None => async_backtrace::location!(String::from(GLOBAL_TASK_DESC)).frame(task),
Some(query_id) => {
async_backtrace::location!(format!("Running query {} spawn task", query_id))
.frame(task)

let location_name = {
if let Some(name) = name {
name
} else {
match ThreadTracker::query_id() {
None => String::from(GLOBAL_TASK_DESC),
Some(query_id) => {
format!("Running query {} spawn task", query_id)
}
}
}
};

let task = async_backtrace::location!(location_name).frame(task);

#[expect(clippy::disallowed_methods)]
Ok(JoinHandle::create(self.handle.spawn(task)))
}
Expand All @@ -380,6 +396,7 @@ impl Drop for Dropper {
// Send a signal to say i am dropping.
if let Some(close_sender) = self.close.take() {
if close_sender.send(()).is_ok() {
info!("close_sender to shutdown Runtime is sent");
match self.join_handler.take().unwrap().join() {
Err(e) => warn!("Runtime dropper panic, {:?}", e),
Ok(true) => {
Expand Down Expand Up @@ -436,7 +453,25 @@ where
F::Output: Send + 'static,
{
#[expect(clippy::disallowed_methods)]
tokio::spawn(location_future(future, std::panic::Location::caller()))
tokio::spawn(location_future(
future,
std::panic::Location::caller(),
None,
))
}

#[track_caller]
pub fn spawn_named<F>(future: F, name: String) -> tokio::task::JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
#[expect(clippy::disallowed_methods)]
tokio::spawn(location_future(
future,
std::panic::Location::caller(),
Some(name),
))
}

#[track_caller]
Expand All @@ -446,7 +481,11 @@ where
F::Output: Send + 'static,
{
#[expect(clippy::disallowed_methods)]
tokio::task::spawn_local(location_future(future, std::panic::Location::caller()))
tokio::task::spawn_local(location_future(
future,
std::panic::Location::caller(),
None,
))
}

#[track_caller]
Expand Down Expand Up @@ -476,8 +515,11 @@ where
pub fn block_on<F: Future>(future: F) -> F::Output {
#[expect(clippy::disallowed_methods)]
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(location_future(future, std::panic::Location::caller()))
tokio::runtime::Handle::current().block_on(location_future(
future,
std::panic::Location::caller(),
None,
))
})
}

Expand All @@ -487,14 +529,19 @@ pub fn try_block_on<F: Future>(future: F) -> std::result::Result<F::Output, F> {
Err(_) => Err(future),
#[expect(clippy::disallowed_methods)]
Ok(handler) => Ok(tokio::task::block_in_place(|| {
handler.block_on(location_future(future, std::panic::Location::caller()))
handler.block_on(location_future(
future,
std::panic::Location::caller(),
None,
))
})),
}
}

fn location_future<F>(
future: F,
frame_location: &'static Location,
frame_name: Option<String>,
) -> impl Future<Output = F::Output>
where
F: Future,
Expand All @@ -506,9 +553,13 @@ where
// TODO: tracking payload
let future = ThreadTracker::tracking_future(future);

let frame_name = std::any::type_name::<F>()
.trim_end_matches("::{{closure}}")
.to_string();
let frame_name = if let Some(n) = frame_name {
n
} else {
std::any::type_name::<F>()
.trim_end_matches("::{{closure}}")
.to_string()
};

async_backtrace::location!(
frame_name,
Expand Down
1 change: 1 addition & 0 deletions src/meta/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ test = true

[dependencies]
anyerror = { workspace = true }
async-backtrace = { workspace = true }
databend-common-arrow = { workspace = true }
databend-common-base = { workspace = true }
databend-common-grpc = { workspace = true }
Expand Down
9 changes: 9 additions & 0 deletions src/meta/client/src/established_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,15 @@ impl EstablishedClient {
self.error.lock().take()
}

#[async_backtrace::framed]
pub async fn kv_api(
&mut self,
request: impl tonic::IntoRequest<RaftRequest>,
) -> Result<Response<RaftReply>, Status> {
self.client.kv_api(request).await.update_client(self)
}

#[async_backtrace::framed]
pub async fn kv_read_v1(
&mut self,
request: impl tonic::IntoRequest<RaftRequest>,
Expand All @@ -162,41 +164,47 @@ impl EstablishedClient {
resp.update_client(self)
}

#[async_backtrace::framed]
pub async fn export(
&mut self,
request: impl tonic::IntoRequest<Empty>,
) -> Result<Response<Streaming<ExportedChunk>>, Status> {
self.client.export(request).await.update_client(self)
}

#[async_backtrace::framed]
pub async fn export_v1(
&mut self,
request: impl tonic::IntoRequest<pb::ExportRequest>,
) -> Result<Response<Streaming<ExportedChunk>>, Status> {
self.client.export_v1(request).await.update_client(self)
}

#[async_backtrace::framed]
pub async fn watch(
&mut self,
request: impl tonic::IntoRequest<WatchRequest>,
) -> Result<Response<Streaming<WatchResponse>>, Status> {
self.client.watch(request).await.update_client(self)
}

#[async_backtrace::framed]
pub async fn transaction(
&mut self,
request: impl tonic::IntoRequest<TxnRequest>,
) -> Result<Response<TxnReply>, Status> {
self.client.transaction(request).await.update_client(self)
}

#[async_backtrace::framed]
pub async fn member_list(
&mut self,
request: impl tonic::IntoRequest<MemberListRequest>,
) -> Result<Response<MemberListReply>, Status> {
self.client.member_list(request).await.update_client(self)
}

#[async_backtrace::framed]
pub async fn get_cluster_status(
&mut self,
request: impl tonic::IntoRequest<Empty>,
Expand All @@ -207,6 +215,7 @@ impl EstablishedClient {
.update_client(self)
}

#[async_backtrace::framed]
pub async fn get_client_info(
&mut self,
request: impl tonic::IntoRequest<Empty>,
Expand Down
Loading

0 comments on commit cab7489

Please sign in to comment.