Skip to content

Commit

Permalink
Turn info into a streaming call
Browse files Browse the repository at this point in the history
  • Loading branch information
bugadani committed Jan 19, 2025
1 parent b79eb83 commit d398450
Showing 1 changed file with 83 additions and 99 deletions.
182 changes: 83 additions & 99 deletions probe-rs-tools/src/bin/probe-rs/cmd/remote/client.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ use axum::http::Uri;
use futures_util::StreamExt as _;
use postcard_rpc::{
header::{VarSeq, VarSeqKind},
host_client::{HostClient, HostErr, IoClosed, MultiSubscription},
host_client::{HostClient, HostErr, IoClosed, MultiSubRxError, MultiSubscription},
Topic,
};
use postcard_schema::Schema;
@@ -178,21 +178,34 @@ impl RpcClient {
}
}

async fn subscribe_multi<T: Topic>(
&self,
depth: usize,
) -> Result<MultiSubscription<T::Message>, IoClosed>
pub async fn publish<T: Topic>(&self, message: &T::Message) -> Result<(), IoClosed>
where
T::Message: DeserializeOwned,
T::Message: Serialize,
{
self.client.subscribe_multi::<T>(depth).await
self.client.publish::<T>(VarSeq::Seq2(0), message).await
}

pub async fn publish<T: Topic>(&self, message: &T::Message) -> Result<(), IoClosed>
async fn send_and_read_stream<E, T, R>(
&self,
req: &E::Request,
on_msg: impl FnMut(T::Message),
) -> anyhow::Result<R>
where
T::Message: Serialize,
E: postcard_rpc::Endpoint<Response = RpcResult<R>>,
E::Request: Serialize + Schema,
E::Response: DeserializeOwned + Schema,
T: Topic,
T::Message: DeserializeOwned,
{
self.client.publish::<T>(VarSeq::Seq2(0), message).await
let Ok(mut stream) = self.client.subscribe_multi::<T>(64).await else {
anyhow::bail!("Failed to subscribe to '{}': Connection closed", T::PATH);
};

tokio::select! {
biased;
_ = read_stream(&mut stream, on_msg) => anyhow::bail!("Topic reader returned unexpectedly"),
r = self.send_resp::<E, R>(req) => r,
}
}

pub async fn upload_file(&self, src_path: &Path) -> anyhow::Result<PathBuf> {
@@ -249,28 +262,10 @@ impl RpcClient {
pub async fn info(
&self,
request: TargetInfoRequest,
mut on_msg: impl FnMut(InfoEvent),
on_msg: impl FnMut(InfoEvent),
) -> anyhow::Result<()> {
let Ok(mut stream) = self.subscribe_multi::<TargetInfoDataTopic>(4).await else {
anyhow::bail!("Failed to subscribe: Connection closed");
};

let trigger = self.send_resp::<TargetInfoEndpoint, _>(&request);

let read_stream = async {
while let Ok(message) = stream.recv().await {
on_msg(message);
}
tracing::warn!("Failed to read topic");
futures_util::future::pending().await
};
tokio::select! {
biased;
_ = read_stream => {},
_ = trigger => {},
}

Ok(())
self.send_and_read_stream::<TargetInfoEndpoint, TargetInfoDataTopic, _>(&request, on_msg)
.await
}

pub async fn load_chip_family(
@@ -330,36 +325,6 @@ impl SessionInterface {
.await
}

async fn send_and_read_stream<E, T, R>(
&self,
req: &E::Request,
mut on_msg: impl FnMut(T::Message),
) -> anyhow::Result<R>
where
E: postcard_rpc::Endpoint<Response = RpcResult<R>>,
E::Request: Serialize + Schema,
E::Response: DeserializeOwned + Schema,
T: Topic,
T::Message: DeserializeOwned,
{
let Ok(mut stream) = self.client.subscribe_multi::<T>(4).await else {
anyhow::bail!("Failed to subscribe to '{}': Connection closed", T::PATH);
};

let read_stream = async {
while let Ok(message) = stream.recv().await {
on_msg(message);
}
tracing::warn!("Failed to read topic");
futures_util::future::pending().await
};

tokio::select! {
r = self.client.send_resp::<E, R>(req) => r,
_ = read_stream => anyhow::bail!("Failed to read event stream"),
}
}

pub async fn flash(
&self,
mut path: PathBuf,
@@ -378,17 +343,18 @@ impl SessionInterface {
*idf_partition_table = self.client.upload_file(&*idf_partition_table).await?;
}

self.send_and_read_stream::<FlashEndpoint, ProgressEventTopic, _>(
&FlashRequest {
sessid: self.sessid,
path,
format,
options,
rtt_client,
},
on_msg,
)
.await
self.client
.send_and_read_stream::<FlashEndpoint, ProgressEventTopic, _>(
&FlashRequest {
sessid: self.sessid,
path,
format,
options,
rtt_client,
},
on_msg,
)
.await
}

pub async fn monitor(
@@ -397,15 +363,16 @@ impl SessionInterface {
options: MonitorOptions,
on_msg: impl FnMut(MonitorEvent),
) -> anyhow::Result<()> {
self.send_and_read_stream::<MonitorEndpoint, MonitorTopic, _>(
&MonitorRequest {
sessid: self.sessid,
mode,
options,
},
on_msg,
)
.await
self.client
.send_and_read_stream::<MonitorEndpoint, MonitorTopic, _>(
&MonitorRequest {
sessid: self.sessid,
mode,
options,
},
on_msg,
)
.await
}

pub async fn list_tests(
@@ -414,15 +381,16 @@ impl SessionInterface {
rtt_client: Option<Key<RttClient>>,
on_msg: impl FnMut(MonitorEvent),
) -> anyhow::Result<Tests> {
self.send_and_read_stream::<ListTestsEndpoint, MonitorTopic, _>(
&ListTestsRequest {
sessid: self.sessid,
boot_info,
rtt_client,
},
on_msg,
)
.await
self.client
.send_and_read_stream::<ListTestsEndpoint, MonitorTopic, _>(
&ListTestsRequest {
sessid: self.sessid,
boot_info,
rtt_client,
},
on_msg,
)
.await
}

pub async fn run_test(
@@ -431,15 +399,16 @@ impl SessionInterface {
rtt_client: Option<Key<RttClient>>,
on_msg: impl FnMut(MonitorEvent),
) -> anyhow::Result<TestResult> {
self.send_and_read_stream::<RunTestEndpoint, MonitorTopic, _>(
&RunTestRequest {
sessid: self.sessid,
test,
rtt_client,
},
on_msg,
)
.await
self.client
.send_and_read_stream::<RunTestEndpoint, MonitorTopic, _>(
&RunTestRequest {
sessid: self.sessid,
test,
rtt_client,
},
on_msg,
)
.await
}

pub async fn create_rtt_client(
@@ -571,3 +540,18 @@ impl CoreInterface {
.await
}
}

async fn read_stream<T>(stream: &mut MultiSubscription<T>, mut on_msg: impl FnMut(T))
where
T: DeserializeOwned,
{
loop {
match stream.recv().await {
Ok(message) => on_msg(message),
Err(MultiSubRxError::Lagged(n)) => tracing::warn!("Lagged by {} messages", n),
Err(MultiSubRxError::IoClosed) => break,
}
}
tracing::warn!("Failed to read topic");
futures_util::future::pending().await
}

0 comments on commit d398450

Please sign in to comment.