Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rpc_module): RpcModule::raw_json_request -> String #1287

Merged
merged 2 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub type MaxResponseSize = usize;
/// A tuple containing:
/// - Call result as a `String`,
/// - a [`mpsc::UnboundedReceiver<String>`] to receive future subscription results
pub type RawRpcResponse = (MethodResponse, mpsc::Receiver<String>);
pub type RawRpcResponse = (String, mpsc::Receiver<String>);

/// The error that can occur when [`Methods::call`] or [`Methods::subscribe`] is invoked.
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -290,8 +290,9 @@ impl Methods {
let params = params.to_rpc_params()?;
let req = Request::new(method.into(), params.as_ref().map(|p| p.as_ref()), Id::Number(0));
tracing::trace!(target: LOG_TARGET, "[Methods::call] Method: {:?}, params: {:?}", method, params);
let (resp, _) = self.inner_call(req, 1, mock_subscription_permit()).await;
let rp = serde_json::from_str::<Response<T>>(resp.as_result())?;
let (rp, _) = self.inner_call(req, 1, mock_subscription_permit()).await;

let rp = serde_json::from_str::<Response<T>>(&rp)?;
ResponseSuccess::try_from(rp).map(|s| s.result).map_err(|e| MethodsError::JsonRpc(e.into_owned()))
}

Expand All @@ -318,7 +319,8 @@ impl Methods {
/// Ok(())
/// }).unwrap();
/// let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#, 1).await.unwrap();
/// let resp: Success<u64> = serde_json::from_str::<Response<u64>>(&resp.as_result()).unwrap().try_into().unwrap();
/// // If the response is an error converting it to `Success` will fail.
/// let resp: Success<u64> = serde_json::from_str::<Response<u64>>(&resp).unwrap().try_into().unwrap();
/// let sub_resp = stream.recv().await.unwrap();
/// assert_eq!(
/// format!(r#"{{"jsonrpc":"2.0","method":"hi","params":{{"subscription":{},"result":"one answer"}}}}"#, resp.result),
Expand All @@ -330,7 +332,7 @@ impl Methods {
&self,
request: &str,
buf_size: usize,
) -> Result<(MethodResponse, mpsc::Receiver<String>), serde_json::Error> {
) -> Result<(String, mpsc::Receiver<String>), serde_json::Error> {
tracing::trace!("[Methods::raw_json_request] Request: {:?}", request);
let req: Request = serde_json::from_str(request)?;
let (resp, rx) = self.inner_call(req, buf_size, mock_subscription_permit()).await;
Expand Down Expand Up @@ -369,9 +371,16 @@ impl Methods {
Some(MethodCallback::Unsubscription(cb)) => (cb)(id, params, 0, usize::MAX),
};

tracing::trace!(target: LOG_TARGET, "[Methods::inner_call] Method: {}, response: {:?}", req.method, response);
let is_success = response.is_success();
let (rp, notif) = response.into_parts();

if let Some(n) = notif {
n.notify(is_success);
}

tracing::trace!(target: LOG_TARGET, "[Methods::inner_call] Method: {}, response: {}", req.method, rp);

(response, rx)
(rp, rx)
}

/// Helper to create a subscription on the `RPC module` without having to spin up a server.
Expand Down Expand Up @@ -426,10 +435,9 @@ impl Methods {
let (resp, rx) = self.inner_call(req, buf_size, mock_subscription_permit()).await;

// TODO: hack around the lifetime on the `SubscriptionId` by deserialize first to serde_json::Value.
let as_success: ResponseSuccess<serde_json::Value> =
serde_json::from_str::<Response<_>>(resp.as_result())?.try_into()?;
let as_success: ResponseSuccess<serde_json::Value> = serde_json::from_str::<Response<_>>(&resp)?.try_into()?;

let sub_id = as_success.result.try_into().map_err(|_| MethodsError::InvalidSubscriptionId(resp.to_result()))?;
let sub_id = as_success.result.try_into().map_err(|_| MethodsError::InvalidSubscriptionId(resp.clone()))?;

Ok(Subscription { sub_id, rx })
}
Expand Down
5 changes: 1 addition & 4 deletions tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,12 @@ pub type Receiver = tokio::sync::mpsc::UnboundedReceiver<Result<(), MethodRespon
pub async fn run_test_notify_test(
module: &NotifyRpcModule,
server_rx: &mut Receiver,
is_success: bool,
kind: Notify,
) -> Result<(), MethodResponseError> {
use jsonrpsee_test_utils::mocks::Id;

let req = jsonrpsee_test_utils::helpers::call("hey", vec![kind], Id::Num(1));
let (rp, _) = module.raw_json_request(&req, 1).await.unwrap();
let (_, notify_rx) = rp.into_parts();
notify_rx.unwrap().notify(is_success);
let _ = module.raw_json_request(&req, 1).await.unwrap();
server_rx.recv().await.expect("Channel is not dropped")
}

Expand Down
6 changes: 3 additions & 3 deletions tests/tests/proc_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ async fn macro_optional_param_parsing() {
.raw_json_request(r#"{"jsonrpc":"2.0","method":"foo_optional_params","params":{"a":22,"c":50},"id":0}"#, 1)
.await
.unwrap();
assert_eq!(resp.into_result(), r#"{"jsonrpc":"2.0","result":"Called with: 22, None, Some(50)","id":0}"#);
assert_eq!(resp, r#"{"jsonrpc":"2.0","result":"Called with: 22, None, Some(50)","id":0}"#);
}

#[tokio::test]
Expand All @@ -290,14 +290,14 @@ async fn macro_zero_copy_cow() {
.unwrap();

// std::borrow::Cow<str> always deserialized to owned variant here
assert_eq!(resp.into_result(), r#"{"jsonrpc":"2.0","result":"Zero copy params: false, true","id":0}"#);
assert_eq!(resp, r#"{"jsonrpc":"2.0","result":"Zero copy params: false, true","id":0}"#);

// serde_json will have to allocate a new string to replace `\t` with byte 0x09 (tab)
let (resp, _) = module
.raw_json_request(r#"{"jsonrpc":"2.0","method":"foo_zero_copy_cow","params":["\tfoo", "\tbar"],"id":0}"#, 1)
.await
.unwrap();
assert_eq!(resp.into_result(), r#"{"jsonrpc":"2.0","result":"Zero copy params: false, false","id":0}"#);
assert_eq!(resp, r#"{"jsonrpc":"2.0","result":"Zero copy params: false, false","id":0}"#);
}

// Disabled on MacOS as GH CI timings on Mac vary wildly (~100ms) making this test fail.
Expand Down
36 changes: 7 additions & 29 deletions tests/tests/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use jsonrpsee::core::{server::*, RpcResult};
use jsonrpsee::types::error::{ErrorCode, ErrorObject, INVALID_PARAMS_MSG, PARSE_ERROR_CODE};
use jsonrpsee::types::{ErrorObjectOwned, Params, Response, ResponsePayload};
use jsonrpsee::SubscriptionMessage;
use jsonrpsee_test_utils::mocks::Id;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::time::interval;
Expand Down Expand Up @@ -387,13 +386,13 @@ async fn subscribe_unsubscribe_without_server() {
let unsub_req = format!("{{\"jsonrpc\":\"2.0\",\"method\":\"my_unsub\",\"params\":[{}],\"id\":1}}", ser_id);
let (resp, _) = module.raw_json_request(&unsub_req, 1).await.unwrap();

assert_eq!(resp.into_result(), r#"{"jsonrpc":"2.0","result":true,"id":1}"#);
assert_eq!(resp, r#"{"jsonrpc":"2.0","result":true,"id":1}"#);

// Unsubscribe already performed; should be error.
let unsub_req = format!("{{\"jsonrpc\":\"2.0\",\"method\":\"my_unsub\",\"params\":[{}],\"id\":1}}", ser_id);
let (resp, _) = module.raw_json_request(&unsub_req, 2).await.unwrap();

assert_eq!(resp.into_result(), r#"{"jsonrpc":"2.0","result":false,"id":1}"#);
assert_eq!(resp, r#"{"jsonrpc":"2.0","result":false,"id":1}"#);
}

let sub1 = subscribe_and_assert(&module);
Expand Down Expand Up @@ -433,7 +432,7 @@ async fn reject_works() {
.unwrap();

let (rp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"my_sub","id":0}"#, 1).await.unwrap();
assert_eq!(rp.into_result(), r#"{"jsonrpc":"2.0","error":{"code":-32700,"message":"rejected"},"id":0}"#);
assert_eq!(rp, r#"{"jsonrpc":"2.0","error":{"code":-32700,"message":"rejected"},"id":0}"#);
assert!(stream.recv().await.is_none());
}

Expand Down Expand Up @@ -524,7 +523,7 @@ async fn serialize_sub_error_adds_extra_string_quotes() {
.unwrap();

let (rp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"my_sub","id":0}"#, 1).await.unwrap();
let resp = serde_json::from_str::<Response<u64>>(rp.as_result()).unwrap();
let resp = serde_json::from_str::<Response<u64>>(&rp).unwrap();
let sub_resp = stream.recv().await.unwrap();

let resp = match resp.payload {
Expand Down Expand Up @@ -569,7 +568,7 @@ async fn subscription_close_response_works() {
{
let (rp, mut stream) =
module.raw_json_request(r#"{"jsonrpc":"2.0","method":"my_sub","params":[1],"id":0}"#, 1).await.unwrap();
let resp = serde_json::from_str::<Response<u64>>(rp.as_result()).unwrap();
let resp = serde_json::from_str::<Response<u64>>(&rp).unwrap();

let sub_id = match resp.payload {
ResponsePayload::Success(val) => val,
Expand Down Expand Up @@ -598,33 +597,12 @@ async fn method_response_notify_on_completion() {
let module = rpc_module_notify_on_response(tx);

assert!(
run_test_notify_test(&module, &mut rx, true, Notify::Success).await.is_ok(),
run_test_notify_test(&module, &mut rx, Notify::Success).await.is_ok(),
"Successful response should be notified"
);
assert!(matches!(
run_test_notify_test(&module, &mut rx, false, Notify::Success).await,
Err(MethodResponseError::JsonRpcError),
));

assert!(matches!(
run_test_notify_test(&module, &mut rx, false, Notify::Error).await,
run_test_notify_test(&module, &mut rx, Notify::Error).await,
Err(MethodResponseError::JsonRpcError),
));
}

#[tokio::test]
async fn method_response_dropped() {
init_logger();

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let module = rpc_module_notify_on_response(tx);

let req = jsonrpsee_test_utils::helpers::call("hey", vec![Notify::Success], Id::Num(1));

// Make a call and drop the method response including its "notify sender"
// This could happen if the connection is closed.
let (rp, _) = module.raw_json_request(&req, 1).await.unwrap();
drop(rp);

assert!(matches!(rx.recv().await, Some(Err(MethodResponseError::Closed))));
}
Loading