From a0c2b395147b725f25d755d09e868ab54cd5f53e Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 22 Aug 2022 12:55:55 +0300 Subject: [PATCH 01/12] middleware: Proxy `GET /path` requests to internal methods Signed-off-by: Alexandru Vasile --- http-server/src/lib.rs | 3 + http-server/src/middlewares/mod.rs | 4 + http-server/src/middlewares/proxy_request.rs | 99 ++++++++++++++++++++ 3 files changed, 106 insertions(+) create mode 100644 http-server/src/middlewares/mod.rs create mode 100644 http-server/src/middlewares/proxy_request.rs diff --git a/http-server/src/lib.rs b/http-server/src/lib.rs index fab3147bb8..7490d4fa39 100644 --- a/http-server/src/lib.rs +++ b/http-server/src/lib.rs @@ -35,6 +35,9 @@ mod server; /// Common builders for RPC responses. pub mod response; +/// Common tower middlewares exposed for RPC interaction. +pub mod middlewares; + pub use jsonrpsee_core::server::access_control::{AccessControl, AccessControlBuilder}; pub use jsonrpsee_core::server::rpc_module::RpcModule; pub use jsonrpsee_types as types; diff --git a/http-server/src/middlewares/mod.rs b/http-server/src/middlewares/mod.rs new file mode 100644 index 0000000000..c2edc70a96 --- /dev/null +++ b/http-server/src/middlewares/mod.rs @@ -0,0 +1,4 @@ +//! Various middleware implementations for RPC specific purposes. + +/// Proxy `GET /path` to internal RPC methods. +pub mod proxy_request; diff --git a/http-server/src/middlewares/proxy_request.rs b/http-server/src/middlewares/proxy_request.rs new file mode 100644 index 0000000000..035bb520c7 --- /dev/null +++ b/http-server/src/middlewares/proxy_request.rs @@ -0,0 +1,99 @@ +//! Middleware that proxies requests at a specified URI to internal +//! RPC method calls. + +use hyper::header::{ACCEPT, CONTENT_TYPE}; +use hyper::http::HeaderValue; +use hyper::{Body, Method, Request, Response}; +use std::task::{Context, Poll}; +use tower::{Layer, Service}; + +/// Layer that applies [`ProxyRequest`] which proxies the `GET /path` requests to +/// specific RPC method calls and that strips the response. +/// +/// See [`ProxyRequest`] for more details. +#[derive(Debug, Clone)] +pub struct ProxyRequestLayer { + path: String, + method: String, +} + +impl ProxyRequestLayer { + /// Creates a new [`ProxyRequestLayer`]. + /// + /// See [`ProxyRequest`] for more details. + pub fn new(path: impl Into, method: impl Into) -> Self { + Self { path: path.into(), method: method.into() } + } +} +impl Layer for ProxyRequestLayer { + type Service = ProxyRequest; + + fn layer(&self, inner: S) -> Self::Service { + ProxyRequest::new(inner, self.path.clone(), self.method.clone()) + } +} + +/// Proxy `GET/path` requests to the specified RPC method calls. +/// +/// # Request +/// +/// The `GET /path` requests are modified into valid `POST` requests for +/// calling the RPC method. This middleware adds appropriate headers to the +/// request, and completely modifies the request `BODY`. +/// +/// # Response +/// +/// The response of the RPC method is stripped down to contain only the method's +/// response, removing any RPC 2.0 spec logic regarding the response' body. +#[derive(Debug, Clone)] +pub struct ProxyRequest { + inner: S, + path: String, + method: String, +} + +impl ProxyRequest { + /// Creates a new [`ProxyRequest`]. + /// + /// The request `GET /path` is redirected to the provided method. + pub fn new(inner: S, path: impl Into, method: impl Into) -> Self { + Self { inner, path: path.into(), method: method.into() } + } +} + +impl Service> for ProxyRequest +where + S: Service, Response = Response, Error = hyper::Error>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: Request) -> Self::Future { + let modify = self.path.as_str() == req.uri() && req.method() == Method::GET; + + // Proxy the request to the appropriate method call. + if modify { + // RPC methods are accessed with `POST`. + *req.method_mut() = Method::POST; + // Precautionary remove the URI. + *req.uri_mut() = "/".parse().unwrap(); + + // Requests must have the following headers: + req.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + req.headers_mut().insert(ACCEPT, HeaderValue::from_static("application/json")); + + // Adjust the body to reflect the method call. + let body = + Body::from(format!("{{\"jsonrpc\":\"2.0\",\"method\":\"{}\",\"params\":null,\"id\":1}}", self.method)); + req = req.map(|_| body); + } + + self.inner.call(req) + } +} From e0c1e5d175dd9ff5cedc73119e4224c89515c53d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 22 Aug 2022 13:07:51 +0300 Subject: [PATCH 02/12] middleware: Modify the response for proxies Signed-off-by: Alexandru Vasile --- http-server/Cargo.toml | 1 + http-server/src/middlewares/proxy_request.rs | 81 +++++++++++++++++++- 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/http-server/Cargo.toml b/http-server/Cargo.toml index a3225641d4..3b9496803c 100644 --- a/http-server/Cargo.toml +++ b/http-server/Cargo.toml @@ -21,6 +21,7 @@ serde_json = { version = "1.0", features = ["raw_value"] } serde = "1" tokio = { version = "1.16", features = ["rt-multi-thread", "macros"] } tower = "0.4.13" +pin-project-lite = "0.2.9" [dev-dependencies] tracing-subscriber = { version = "0.3.3", features = ["env-filter"] } diff --git a/http-server/src/middlewares/proxy_request.rs b/http-server/src/middlewares/proxy_request.rs index 035bb520c7..a5c2db9295 100644 --- a/http-server/src/middlewares/proxy_request.rs +++ b/http-server/src/middlewares/proxy_request.rs @@ -1,9 +1,15 @@ //! Middleware that proxies requests at a specified URI to internal //! RPC method calls. +use crate::response; +use futures_util::ready; +use hyper::body::HttpBody; use hyper::header::{ACCEPT, CONTENT_TYPE}; use hyper::http::HeaderValue; use hyper::{Body, Method, Request, Response}; +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; use std::task::{Context, Poll}; use tower::{Layer, Service}; @@ -63,11 +69,12 @@ impl ProxyRequest { impl Service> for ProxyRequest where - S: Service, Response = Response, Error = hyper::Error>, + S: Service, Response = Response>, + >>::Error: From, { type Response = S::Response; type Error = S::Error; - type Future = S::Future; + type Future = ResponseFuture; #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { @@ -94,6 +101,74 @@ where req = req.map(|_| body); } - self.inner.call(req) + // Depending on `modify` adjust the response. + ResponseFuture::PollFuture { future: self.inner.call(req), modify } + } +} + +pin_project! { + /// Response future for [`ProxyRequest`]. + #[project = ResponseFutureState] + #[allow(missing_docs)] + pub enum ResponseFuture { + /// Poll the response out of the future. + PollFuture { + #[pin] + future: F, + modify: bool, + }, + /// Poll the [`hyper::Body`] response and modify it. + PollBodyData { + body: Body, + body_bytes: Vec, + }, + } +} + +impl Future for ResponseFuture +where + F: Future, E>>, + E: From, +{ + type Output = F::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // The purpose of this loop is to optimise the transition from + // `PollFuture` -> `PollBodyData` state, that would otherwise + // require a `cx.wake().wake_by_ref and return Poll::Pending`. + loop { + match self.as_mut().project() { + ResponseFutureState::PollFuture { future, modify } => { + let res: Response = ready!(future.poll(cx)?); + + // Nothing to modify: return the response as is. + if !*modify { + return Poll::Ready(Ok(res)); + } + + let inner = ResponseFuture::PollBodyData { body: res.into_body(), body_bytes: Vec::new() }; + self.set(inner); + } + ResponseFutureState::PollBodyData { body, body_bytes } => { + while let Some(chunk) = ready!(Pin::new(&mut *body).poll_data(cx)?) { + body_bytes.extend_from_slice(chunk.as_ref()); + } + + #[derive(serde::Deserialize, Debug)] + struct RpcPayload<'a> { + #[serde(borrow)] + result: &'a serde_json::value::RawValue, + } + + let response = if let Ok(payload) = serde_json::from_slice::(body_bytes) { + response::ok_response(payload.result.to_string()) + } else { + response::internal_error() + }; + + return Poll::Ready(Ok(response)); + } + } + } } } From 7600c86e0d507dc63f576b4cc6f99c824d73e28d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 22 Aug 2022 13:35:41 +0300 Subject: [PATCH 03/12] examples: Add `ProxyRequestLayer` example for URI redirection Signed-off-by: Alexandru Vasile --- examples/examples/http_proxy_middleware.rs | 101 +++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 examples/examples/http_proxy_middleware.rs diff --git a/examples/examples/http_proxy_middleware.rs b/examples/examples/http_proxy_middleware.rs new file mode 100644 index 0000000000..8e873ad32e --- /dev/null +++ b/examples/examples/http_proxy_middleware.rs @@ -0,0 +1,101 @@ +// Copyright 2019-2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! This example utilizes the `ProxyRequest` layer for redirecting +//! `GET /path` requests to internal RPC methods. +//! +//! The RPC server registers a method named `system_health` which +//! returns `serde_json::Value`. Redirect any `GET /health` +//! requests to the internal method, and return only the method's +//! response in the body (ie, without any jsonRPC 2.0 overhead). +//! +//! # Note +//! +//! This functionality is useful for services which would +//! like to query a certain `URI` path for statistics. + +use hyper::{Body, Client, Request}; +use std::net::SocketAddr; +use std::time::Duration; + +use jsonrpsee::core::client::ClientT; +use jsonrpsee::http_client::HttpClientBuilder; +use jsonrpsee::http_server::middlewares::proxy_request::ProxyRequestLayer; +use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init() + .expect("setting default subscriber failed"); + + let (addr, _handler) = run_server().await?; + let url = format!("http://{}", addr); + + // Use RPC client to get the response of `say_hello` method. + let client = HttpClientBuilder::default().build(&url)?; + let response: String = client.request("say_hello", None).await?; + println!("[main]: response: {:?}", response); + + // Use hyper client to manually submit a `GET /health` request. + let http_client = Client::new(); + let uri = format!("http://{}/health", addr); + + let req = Request::builder().method("GET").uri(&uri).body(Body::empty())?; + println!("[main]: Submit proxy request: {:?}", req); + let res = http_client.request(req).await?; + println!("[main]: Received proxy response: {:?}", res); + + // Interpret the response as String. + let bytes = hyper::body::to_bytes(res.into_body()).await.unwrap(); + let out = String::from_utf8(bytes.to_vec()).unwrap(); + println!("[main]: Interpret proxy response: {:?}", out); + assert_eq!(out.as_str(), "{\"health\":true}"); + + Ok(()) +} + +async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> { + // Custom tower service to handle the RPC requests + let service_builder = tower::ServiceBuilder::new() + // Proxy `GET /health` requests to internal `system_health` method. + .layer(ProxyRequestLayer::new("/health", "system_health")) + .timeout(Duration::from_secs(2)); + + let server = + HttpServerBuilder::new().set_middleware(service_builder).build("127.0.0.1:0".parse::()?).await?; + + let addr = server.local_addr()?; + + let mut module = RpcModule::new(()); + module.register_method("say_hello", |_, _| Ok("lo")).unwrap(); + module.register_method("system_health", |_, _| Ok(serde_json::json!({ "health": true }))).unwrap(); + + let handler = server.start(module)?; + + Ok((addr, handler)) +} From f7737f656be982c76d6f3704de66fb2d725412fd Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 22 Aug 2022 13:48:32 +0300 Subject: [PATCH 04/12] http: Remove internal Health API Signed-off-by: Alexandru Vasile --- http-server/src/server.rs | 100 +------------------------------------- tests/tests/helpers.rs | 9 ++-- 2 files changed, 7 insertions(+), 102 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 5f818d6fab..d74f157107 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -46,7 +46,7 @@ use jsonrpsee_core::server::helpers::{prepare_error, MethodResponse}; use jsonrpsee_core::server::helpers::{BatchResponse, BatchResponseBuilder}; use jsonrpsee_core::server::resource_limiting::Resources; use jsonrpsee_core::server::rpc_module::{MethodKind, Methods}; -use jsonrpsee_core::tracing::{rx_log_from_json, rx_log_from_str, tx_log_from_str, RpcTracing}; +use jsonrpsee_core::tracing::{rx_log_from_json, tx_log_from_str, RpcTracing}; use jsonrpsee_core::TEN_MB_SIZE_BYTES; use jsonrpsee_types::error::{ErrorCode, ErrorObject, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG}; use jsonrpsee_types::{Id, Notification, Params, Request}; @@ -72,7 +72,6 @@ pub struct Builder { tokio_runtime: Option, logger: L, max_log_length: u32, - health_api: Option, service_builder: tower::ServiceBuilder, } @@ -87,7 +86,6 @@ impl Default for Builder { tokio_runtime: None, logger: (), max_log_length: 4096, - health_api: None, service_builder: tower::ServiceBuilder::new(), } } @@ -154,7 +152,6 @@ impl Builder { tokio_runtime: self.tokio_runtime, logger, max_log_length: self.max_log_length, - health_api: self.health_api, service_builder: self.service_builder, } } @@ -203,23 +200,6 @@ impl Builder { self } - /// Enable health endpoint. - /// Allows you to expose one of the methods under GET / The method will be invoked with no parameters. - /// Error returned from the method will be converted to status 500 response. - /// Expects a tuple with (, ). - /// - /// Fails if the path is missing `/`. - pub fn health_api(mut self, path: impl Into, method: impl Into) -> Result { - let path = path.into(); - - if !path.starts_with('/') { - return Err(Error::Custom(format!("Health endpoint path must start with `/` to work, got: {}", path))); - } - - self.health_api = Some(HealthApi { path, method: method.into() }); - Ok(self) - } - /// Configure a custom [`tower::ServiceBuilder`] middleware for composing layers to be applied to the RPC service. /// /// Default: No tower layers are applied to the RPC service. @@ -254,7 +234,6 @@ impl Builder { tokio_runtime: self.tokio_runtime, logger: self.logger, max_log_length: self.max_log_length, - health_api: self.health_api, service_builder, } } @@ -309,7 +288,6 @@ impl Builder { tokio_runtime: self.tokio_runtime, logger: self.logger, max_log_length: self.max_log_length, - health_api: self.health_api, service_builder: self.service_builder, }) } @@ -355,7 +333,6 @@ impl Builder { tokio_runtime: self.tokio_runtime, logger: self.logger, max_log_length: self.max_log_length, - health_api: self.health_api, service_builder: self.service_builder, }) } @@ -392,18 +369,11 @@ impl Builder { tokio_runtime: self.tokio_runtime, logger: self.logger, max_log_length: self.max_log_length, - health_api: self.health_api, service_builder: self.service_builder, }) } } -#[derive(Debug, Clone)] -struct HealthApi { - path: String, - method: String, -} - /// Handle used to run or stop the server. #[derive(Debug)] pub struct ServerHandle { @@ -448,8 +418,6 @@ struct ServiceData { resources: Resources, /// User provided logger. logger: L, - /// Health API. - health_api: Option, /// Max request body size. max_request_body_size: u32, /// Max response body size. @@ -471,7 +439,6 @@ impl ServiceData { acl, resources, logger, - health_api, max_request_body_size, max_response_body_size, max_log_length, @@ -512,20 +479,6 @@ impl ServiceData { }) .await } - Method::GET => match health_api.as_ref() { - Some(health) if health.path.as_str() == request.uri().path() => { - process_health_request( - health, - logger, - methods, - max_response_body_size, - request_start, - max_log_length, - ) - .await - } - _ => response::method_not_allowed(), - }, // Error scenarios: Method::POST => response::unsupported_content_type(), _ => response::method_not_allowed(), @@ -587,7 +540,6 @@ pub struct Server { /// Custom tokio runtime to run the server on. tokio_runtime: Option, logger: L, - health_api: Option, service_builder: tower::ServiceBuilder, } @@ -626,7 +578,6 @@ where let logger = self.logger; let batch_requests_supported = self.batch_requests_supported; let methods = methods.into().initialize_resources(&resources)?; - let health_api = self.health_api; let make_service = make_service_fn(move |conn: &AddrStream| { let service = TowerService { @@ -636,7 +587,6 @@ where acl: acl.clone(), resources: resources.clone(), logger: logger.clone(), - health_api: health_api.clone(), max_request_body_size, max_response_body_size, max_log_length, @@ -766,54 +716,6 @@ async fn process_validated_request(input: ProcessValidatedRequest) } } -async fn process_health_request( - health_api: &HealthApi, - logger: L, - methods: Methods, - max_response_body_size: u32, - request_start: L::Instant, - max_log_length: u32, -) -> hyper::Response { - let trace = RpcTracing::method_call(&health_api.method); - async { - tx_log_from_str("HTTP health API", max_log_length); - let response = match methods.method_with_name(&health_api.method) { - None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)), - Some((_name, method_callback)) => match method_callback.inner() { - MethodKind::Sync(callback) => { - (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize) - } - MethodKind::Async(callback) => { - (callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None).await - } - MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => { - MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError)) - } - }, - }; - - rx_log_from_str(&response.result, max_log_length); - logger.on_result(&health_api.method, response.success, request_start); - logger.on_response(&response.result, request_start); - - if response.success { - #[derive(serde::Deserialize)] - struct RpcPayload<'a> { - #[serde(borrow)] - result: &'a serde_json::value::RawValue, - } - - let payload: RpcPayload = serde_json::from_str(&response.result) - .expect("valid JSON-RPC response must have a result field and be valid JSON; qed"); - response::ok_response(payload.result.to_string()) - } else { - response::internal_error() - } - } - .instrument(trace.into_span()) - .await -} - #[derive(Debug, Clone)] struct Batch<'a, L: Logger> { data: Vec, diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index 180014e6c9..9aae7387ed 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -30,6 +30,7 @@ use std::time::Duration; use futures::{SinkExt, StreamExt}; use jsonrpsee::core::error::SubscriptionClosed; use jsonrpsee::core::server::access_control::{AccessControl, AccessControlBuilder}; +use jsonrpsee::http_server::middlewares::proxy_request::ProxyRequestLayer; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle}; use jsonrpsee::types::error::{ErrorObject, SUBSCRIPTION_CLOSED_WITH_ERROR}; use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle}; @@ -223,13 +224,15 @@ pub async fn http_server() -> (SocketAddr, HttpServerHandle) { } pub async fn http_server_with_access_control(acl: AccessControl, cors: CorsLayer) -> (SocketAddr, HttpServerHandle) { - let middleware = tower::ServiceBuilder::new().layer(cors); + let middleware = tower::ServiceBuilder::new() + // Proxy `GET /health` requests to internal `system_health` method. + .layer(ProxyRequestLayer::new("/health", "system_health")) + // Add `CORS` layer. + .layer(cors); let server = HttpServerBuilder::default() .set_access_control(acl) .set_middleware(middleware) - .health_api("/health", "system_health") - .unwrap() .build("127.0.0.1:0") .await .unwrap(); From cb1694713e22dc21c4b0881ed5e79dd1c543d92e Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 22 Aug 2022 16:41:10 +0300 Subject: [PATCH 05/12] middleware: Replace `ResponseFuture` with pinning Signed-off-by: Alexandru Vasile --- http-server/src/middlewares/proxy_request.rs | 108 ++++++------------- 1 file changed, 35 insertions(+), 73 deletions(-) diff --git a/http-server/src/middlewares/proxy_request.rs b/http-server/src/middlewares/proxy_request.rs index a5c2db9295..e7fcab0bdf 100644 --- a/http-server/src/middlewares/proxy_request.rs +++ b/http-server/src/middlewares/proxy_request.rs @@ -2,12 +2,10 @@ //! RPC method calls. use crate::response; -use futures_util::ready; -use hyper::body::HttpBody; use hyper::header::{ACCEPT, CONTENT_TYPE}; use hyper::http::HeaderValue; use hyper::{Body, Method, Request, Response}; -use pin_project_lite::pin_project; +use std::error::Error; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -70,15 +68,17 @@ impl ProxyRequest { impl Service> for ProxyRequest where S: Service, Response = Response>, - >>::Error: From, + S::Response: 'static, + S::Error: Into> + 'static, + S::Future: Send + 'static, { type Response = S::Response; - type Error = S::Error; - type Future = ResponseFuture; + type Error = Box; + type Future = Pin> + Send + 'static>>; #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) + self.inner.poll_ready(cx).map_err(Into::into) } fn call(&mut self, mut req: Request) -> Self::Future { @@ -101,74 +101,36 @@ where req = req.map(|_| body); } - // Depending on `modify` adjust the response. - ResponseFuture::PollFuture { future: self.inner.call(req), modify } - } -} + // Call the inner service and get a future that resolves to the response. + let fut = self.inner.call(req); -pin_project! { - /// Response future for [`ProxyRequest`]. - #[project = ResponseFutureState] - #[allow(missing_docs)] - pub enum ResponseFuture { - /// Poll the response out of the future. - PollFuture { - #[pin] - future: F, - modify: bool, - }, - /// Poll the [`hyper::Body`] response and modify it. - PollBodyData { - body: Body, - body_bytes: Vec, - }, - } -} + // Adjust the response if needed. + let res_fut = async move { + let res = fut.await.map_err(|err| err.into())?; -impl Future for ResponseFuture -where - F: Future, E>>, - E: From, -{ - type Output = F::Output; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // The purpose of this loop is to optimise the transition from - // `PollFuture` -> `PollBodyData` state, that would otherwise - // require a `cx.wake().wake_by_ref and return Poll::Pending`. - loop { - match self.as_mut().project() { - ResponseFutureState::PollFuture { future, modify } => { - let res: Response = ready!(future.poll(cx)?); - - // Nothing to modify: return the response as is. - if !*modify { - return Poll::Ready(Ok(res)); - } - - let inner = ResponseFuture::PollBodyData { body: res.into_body(), body_bytes: Vec::new() }; - self.set(inner); - } - ResponseFutureState::PollBodyData { body, body_bytes } => { - while let Some(chunk) = ready!(Pin::new(&mut *body).poll_data(cx)?) { - body_bytes.extend_from_slice(chunk.as_ref()); - } - - #[derive(serde::Deserialize, Debug)] - struct RpcPayload<'a> { - #[serde(borrow)] - result: &'a serde_json::value::RawValue, - } - - let response = if let Ok(payload) = serde_json::from_slice::(body_bytes) { - response::ok_response(payload.result.to_string()) - } else { - response::internal_error() - }; - - return Poll::Ready(Ok(response)); - } + // Nothing to modify: return the response as is. + if !modify { + return Ok(res); } - } + + let body = res.into_body(); + let bytes = hyper::body::to_bytes(body).await?; + + #[derive(serde::Deserialize, Debug)] + struct RpcPayload<'a> { + #[serde(borrow)] + result: &'a serde_json::value::RawValue, + } + + let response = if let Ok(payload) = serde_json::from_slice::(&bytes) { + response::ok_response(payload.result.to_string()) + } else { + response::internal_error() + }; + + Ok(response) + }; + + Box::pin(res_fut) } } From 37354532aaed77433796a650b375b89c4f5856ba Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 22 Aug 2022 17:09:02 +0300 Subject: [PATCH 06/12] middleware: Use `Uri::from_static` and `RequestSer` for body message Signed-off-by: Alexandru Vasile --- http-server/src/middlewares/proxy_request.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/http-server/src/middlewares/proxy_request.rs b/http-server/src/middlewares/proxy_request.rs index e7fcab0bdf..cac1a2fbf1 100644 --- a/http-server/src/middlewares/proxy_request.rs +++ b/http-server/src/middlewares/proxy_request.rs @@ -4,7 +4,8 @@ use crate::response; use hyper::header::{ACCEPT, CONTENT_TYPE}; use hyper::http::HeaderValue; -use hyper::{Body, Method, Request, Response}; +use hyper::{Body, Method, Request, Response, Uri}; +use jsonrpsee_types::{Id, RequestSer}; use std::error::Error; use std::future::Future; use std::pin::Pin; @@ -89,15 +90,17 @@ where // RPC methods are accessed with `POST`. *req.method_mut() = Method::POST; // Precautionary remove the URI. - *req.uri_mut() = "/".parse().unwrap(); + *req.uri_mut() = Uri::from_static("/"); // Requests must have the following headers: req.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); req.headers_mut().insert(ACCEPT, HeaderValue::from_static("application/json")); // Adjust the body to reflect the method call. - let body = - Body::from(format!("{{\"jsonrpc\":\"2.0\",\"method\":\"{}\",\"params\":null,\"id\":1}}", self.method)); + let body = Body::from( + serde_json::to_string(&RequestSer::new(&Id::Number(0), self.method.as_str(), None)) + .expect("Valid request; qed"), + ); req = req.map(|_| body); } From c71758bec128da30a386ddfda9024c943a7b40a5 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 22 Aug 2022 17:21:03 +0300 Subject: [PATCH 07/12] middleware: Use `Arc` instead of `String` Signed-off-by: Alexandru Vasile --- http-server/src/middlewares/proxy_request.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/http-server/src/middlewares/proxy_request.rs b/http-server/src/middlewares/proxy_request.rs index cac1a2fbf1..d026e43177 100644 --- a/http-server/src/middlewares/proxy_request.rs +++ b/http-server/src/middlewares/proxy_request.rs @@ -9,6 +9,7 @@ use jsonrpsee_types::{Id, RequestSer}; use std::error::Error; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use tower::{Layer, Service}; @@ -34,7 +35,7 @@ impl Layer for ProxyRequestLayer { type Service = ProxyRequest; fn layer(&self, inner: S) -> Self::Service { - ProxyRequest::new(inner, self.path.clone(), self.method.clone()) + ProxyRequest::new(inner, &self.path, &self.method) } } @@ -53,16 +54,16 @@ impl Layer for ProxyRequestLayer { #[derive(Debug, Clone)] pub struct ProxyRequest { inner: S, - path: String, - method: String, + path: Arc, + method: Arc, } impl ProxyRequest { /// Creates a new [`ProxyRequest`]. /// /// The request `GET /path` is redirected to the provided method. - pub fn new(inner: S, path: impl Into, method: impl Into) -> Self { - Self { inner, path: path.into(), method: method.into() } + pub fn new(inner: S, path: &str, method: &str) -> Self { + Self { inner, path: Arc::from(path), method: Arc::from(method) } } } @@ -83,7 +84,7 @@ where } fn call(&mut self, mut req: Request) -> Self::Future { - let modify = self.path.as_str() == req.uri() && req.method() == Method::GET; + let modify = self.path.as_ref() == req.uri() && req.method() == Method::GET; // Proxy the request to the appropriate method call. if modify { @@ -98,7 +99,7 @@ where // Adjust the body to reflect the method call. let body = Body::from( - serde_json::to_string(&RequestSer::new(&Id::Number(0), self.method.as_str(), None)) + serde_json::to_string(&RequestSer::new(&Id::Number(0), &self.method, None)) .expect("Valid request; qed"), ); req = req.map(|_| body); From fbe2dece9c2484d535e642aa15ee408017513260 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 22 Aug 2022 17:24:32 +0300 Subject: [PATCH 08/12] Rename `ProxyRequest` to `ProxyGetRequest` and rename mod to `middleware` Signed-off-by: Alexandru Vasile --- examples/examples/http_proxy_middleware.rs | 4 ++-- http-server/src/lib.rs | 4 ++-- .../src/{middlewares => middleware}/mod.rs | 0 .../proxy_request.rs | 18 +++++++++--------- tests/tests/helpers.rs | 4 ++-- 5 files changed, 15 insertions(+), 15 deletions(-) rename http-server/src/{middlewares => middleware}/mod.rs (100%) rename http-server/src/{middlewares => middleware}/proxy_request.rs (90%) diff --git a/examples/examples/http_proxy_middleware.rs b/examples/examples/http_proxy_middleware.rs index 8e873ad32e..e82e9e68dc 100644 --- a/examples/examples/http_proxy_middleware.rs +++ b/examples/examples/http_proxy_middleware.rs @@ -43,7 +43,7 @@ use std::time::Duration; use jsonrpsee::core::client::ClientT; use jsonrpsee::http_client::HttpClientBuilder; -use jsonrpsee::http_server::middlewares::proxy_request::ProxyRequestLayer; +use jsonrpsee::http_server::middleware::proxy_request::ProxyGetRequestLayer; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule}; #[tokio::main] @@ -83,7 +83,7 @@ async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> { // Custom tower service to handle the RPC requests let service_builder = tower::ServiceBuilder::new() // Proxy `GET /health` requests to internal `system_health` method. - .layer(ProxyRequestLayer::new("/health", "system_health")) + .layer(ProxyGetRequestLayer::new("/health", "system_health")) .timeout(Duration::from_secs(2)); let server = diff --git a/http-server/src/lib.rs b/http-server/src/lib.rs index 7490d4fa39..31fbeb50d3 100644 --- a/http-server/src/lib.rs +++ b/http-server/src/lib.rs @@ -35,8 +35,8 @@ mod server; /// Common builders for RPC responses. pub mod response; -/// Common tower middlewares exposed for RPC interaction. -pub mod middlewares; +/// Common tower middleware exposed for RPC interaction. +pub mod middleware; pub use jsonrpsee_core::server::access_control::{AccessControl, AccessControlBuilder}; pub use jsonrpsee_core::server::rpc_module::RpcModule; diff --git a/http-server/src/middlewares/mod.rs b/http-server/src/middleware/mod.rs similarity index 100% rename from http-server/src/middlewares/mod.rs rename to http-server/src/middleware/mod.rs diff --git a/http-server/src/middlewares/proxy_request.rs b/http-server/src/middleware/proxy_request.rs similarity index 90% rename from http-server/src/middlewares/proxy_request.rs rename to http-server/src/middleware/proxy_request.rs index d026e43177..ecd552cf13 100644 --- a/http-server/src/middlewares/proxy_request.rs +++ b/http-server/src/middleware/proxy_request.rs @@ -18,12 +18,12 @@ use tower::{Layer, Service}; /// /// See [`ProxyRequest`] for more details. #[derive(Debug, Clone)] -pub struct ProxyRequestLayer { +pub struct ProxyGetRequestLayer { path: String, method: String, } -impl ProxyRequestLayer { +impl ProxyGetRequestLayer { /// Creates a new [`ProxyRequestLayer`]. /// /// See [`ProxyRequest`] for more details. @@ -31,15 +31,15 @@ impl ProxyRequestLayer { Self { path: path.into(), method: method.into() } } } -impl Layer for ProxyRequestLayer { - type Service = ProxyRequest; +impl Layer for ProxyGetRequestLayer { + type Service = ProxyGetRequest; fn layer(&self, inner: S) -> Self::Service { - ProxyRequest::new(inner, &self.path, &self.method) + ProxyGetRequest::new(inner, &self.path, &self.method) } } -/// Proxy `GET/path` requests to the specified RPC method calls. +/// Proxy `GET /path` requests to the specified RPC method calls. /// /// # Request /// @@ -52,13 +52,13 @@ impl Layer for ProxyRequestLayer { /// The response of the RPC method is stripped down to contain only the method's /// response, removing any RPC 2.0 spec logic regarding the response' body. #[derive(Debug, Clone)] -pub struct ProxyRequest { +pub struct ProxyGetRequest { inner: S, path: Arc, method: Arc, } -impl ProxyRequest { +impl ProxyGetRequest { /// Creates a new [`ProxyRequest`]. /// /// The request `GET /path` is redirected to the provided method. @@ -67,7 +67,7 @@ impl ProxyRequest { } } -impl Service> for ProxyRequest +impl Service> for ProxyGetRequest where S: Service, Response = Response>, S::Response: 'static, diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index 9aae7387ed..beecf3b15e 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -30,7 +30,7 @@ use std::time::Duration; use futures::{SinkExt, StreamExt}; use jsonrpsee::core::error::SubscriptionClosed; use jsonrpsee::core::server::access_control::{AccessControl, AccessControlBuilder}; -use jsonrpsee::http_server::middlewares::proxy_request::ProxyRequestLayer; +use jsonrpsee::http_server::middleware::proxy_request::ProxyGetRequestLayer; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle}; use jsonrpsee::types::error::{ErrorObject, SUBSCRIPTION_CLOSED_WITH_ERROR}; use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle}; @@ -226,7 +226,7 @@ pub async fn http_server() -> (SocketAddr, HttpServerHandle) { pub async fn http_server_with_access_control(acl: AccessControl, cors: CorsLayer) -> (SocketAddr, HttpServerHandle) { let middleware = tower::ServiceBuilder::new() // Proxy `GET /health` requests to internal `system_health` method. - .layer(ProxyRequestLayer::new("/health", "system_health")) + .layer(ProxyGetRequestLayer::new("/health", "system_health")) // Add `CORS` layer. .layer(cors); From 3e2ecffd3cccc686d80eb7690b97b698303f0edb Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 22 Aug 2022 17:51:11 +0300 Subject: [PATCH 09/12] middleware: Improve docs Signed-off-by: Alexandru Vasile --- http-server/src/middleware/proxy_request.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/http-server/src/middleware/proxy_request.rs b/http-server/src/middleware/proxy_request.rs index ecd552cf13..e14ab0f47f 100644 --- a/http-server/src/middleware/proxy_request.rs +++ b/http-server/src/middleware/proxy_request.rs @@ -13,10 +13,10 @@ use std::sync::Arc; use std::task::{Context, Poll}; use tower::{Layer, Service}; -/// Layer that applies [`ProxyRequest`] which proxies the `GET /path` requests to +/// Layer that applies [`ProxyGetRequest`] which proxies the `GET /path` requests to /// specific RPC method calls and that strips the response. /// -/// See [`ProxyRequest`] for more details. +/// See [`ProxyGetRequest`] for more details. #[derive(Debug, Clone)] pub struct ProxyGetRequestLayer { path: String, @@ -24,9 +24,9 @@ pub struct ProxyGetRequestLayer { } impl ProxyGetRequestLayer { - /// Creates a new [`ProxyRequestLayer`]. + /// Creates a new [`ProxyGetRequestLayer`]. /// - /// See [`ProxyRequest`] for more details. + /// See [`ProxyGetRequest`] for more details. pub fn new(path: impl Into, method: impl Into) -> Self { Self { path: path.into(), method: method.into() } } @@ -59,7 +59,7 @@ pub struct ProxyGetRequest { } impl ProxyGetRequest { - /// Creates a new [`ProxyRequest`]. + /// Creates a new [`ProxyGetRequest`]. /// /// The request `GET /path` is redirected to the provided method. pub fn new(inner: S, path: &str, method: &str) -> Self { From b01f13ea2da0021650b18c60fee68c02dcd5af41 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 22 Aug 2022 18:11:02 +0300 Subject: [PATCH 10/12] middleware: Fail if path does not start with `/` Signed-off-by: Alexandru Vasile --- examples/examples/http_proxy_middleware.rs | 2 +- http-server/src/middleware/proxy_request.rs | 20 ++++++++++++++++---- tests/tests/helpers.rs | 2 +- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/examples/examples/http_proxy_middleware.rs b/examples/examples/http_proxy_middleware.rs index e82e9e68dc..f4ba33397c 100644 --- a/examples/examples/http_proxy_middleware.rs +++ b/examples/examples/http_proxy_middleware.rs @@ -83,7 +83,7 @@ async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> { // Custom tower service to handle the RPC requests let service_builder = tower::ServiceBuilder::new() // Proxy `GET /health` requests to internal `system_health` method. - .layer(ProxyGetRequestLayer::new("/health", "system_health")) + .layer(ProxyGetRequestLayer::new("/health", "system_health")?) .timeout(Duration::from_secs(2)); let server = diff --git a/http-server/src/middleware/proxy_request.rs b/http-server/src/middleware/proxy_request.rs index e14ab0f47f..bbe76c788c 100644 --- a/http-server/src/middleware/proxy_request.rs +++ b/http-server/src/middleware/proxy_request.rs @@ -5,6 +5,7 @@ use crate::response; use hyper::header::{ACCEPT, CONTENT_TYPE}; use hyper::http::HeaderValue; use hyper::{Body, Method, Request, Response, Uri}; +use jsonrpsee_core::error::Error as RpcError; use jsonrpsee_types::{Id, RequestSer}; use std::error::Error; use std::future::Future; @@ -27,8 +28,13 @@ impl ProxyGetRequestLayer { /// Creates a new [`ProxyGetRequestLayer`]. /// /// See [`ProxyGetRequest`] for more details. - pub fn new(path: impl Into, method: impl Into) -> Self { - Self { path: path.into(), method: method.into() } + pub fn new(path: impl Into, method: impl Into) -> Result { + let path = path.into(); + if !path.starts_with('/') { + return Err(RpcError::Custom("ProxyGetRequestLayer path must start with `/`".to_string())); + } + + Ok(Self { path, method: method.into() }) } } impl Layer for ProxyGetRequestLayer { @@ -36,6 +42,7 @@ impl Layer for ProxyGetRequestLayer { fn layer(&self, inner: S) -> Self::Service { ProxyGetRequest::new(inner, &self.path, &self.method) + .expect("Path already validated in ProxyGetRequestLayer; qed") } } @@ -62,8 +69,13 @@ impl ProxyGetRequest { /// Creates a new [`ProxyGetRequest`]. /// /// The request `GET /path` is redirected to the provided method. - pub fn new(inner: S, path: &str, method: &str) -> Self { - Self { inner, path: Arc::from(path), method: Arc::from(method) } + /// Fails if the path does not start with `/`. + pub fn new(inner: S, path: &str, method: &str) -> Result { + if !path.starts_with('/') { + return Err(RpcError::Custom(format!("ProxyGetRequest path must start with `/`, got: {}", path))); + } + + Ok(Self { inner, path: Arc::from(path), method: Arc::from(method) }) } } diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index beecf3b15e..c3a47d9f79 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -226,7 +226,7 @@ pub async fn http_server() -> (SocketAddr, HttpServerHandle) { pub async fn http_server_with_access_control(acl: AccessControl, cors: CorsLayer) -> (SocketAddr, HttpServerHandle) { let middleware = tower::ServiceBuilder::new() // Proxy `GET /health` requests to internal `system_health` method. - .layer(ProxyGetRequestLayer::new("/health", "system_health")) + .layer(ProxyGetRequestLayer::new("/health", "system_health").unwrap()) // Add `CORS` layer. .layer(cors); From ee38c4b7be0dbfbe3487fa452a6827b499e9b050 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 22 Aug 2022 18:13:34 +0300 Subject: [PATCH 11/12] http-server: Remove pin project dependency Signed-off-by: Alexandru Vasile --- http-server/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/http-server/Cargo.toml b/http-server/Cargo.toml index 3b9496803c..a3225641d4 100644 --- a/http-server/Cargo.toml +++ b/http-server/Cargo.toml @@ -21,7 +21,6 @@ serde_json = { version = "1.0", features = ["raw_value"] } serde = "1" tokio = { version = "1.16", features = ["rt-multi-thread", "macros"] } tower = "0.4.13" -pin-project-lite = "0.2.9" [dev-dependencies] tracing-subscriber = { version = "0.3.3", features = ["env-filter"] } From e264ec047d794b81a1936bda69865c1523aa9458 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 24 Aug 2022 12:35:02 +0300 Subject: [PATCH 12/12] Rename `proxy_request.rs` to `proxy_get_request.rs` Signed-off-by: Alexandru Vasile --- examples/examples/http_proxy_middleware.rs | 2 +- http-server/src/middleware/mod.rs | 2 +- .../src/middleware/{proxy_request.rs => proxy_get_request.rs} | 0 tests/tests/helpers.rs | 2 +- 4 files changed, 3 insertions(+), 3 deletions(-) rename http-server/src/middleware/{proxy_request.rs => proxy_get_request.rs} (100%) diff --git a/examples/examples/http_proxy_middleware.rs b/examples/examples/http_proxy_middleware.rs index f4ba33397c..53dd2d15b8 100644 --- a/examples/examples/http_proxy_middleware.rs +++ b/examples/examples/http_proxy_middleware.rs @@ -43,7 +43,7 @@ use std::time::Duration; use jsonrpsee::core::client::ClientT; use jsonrpsee::http_client::HttpClientBuilder; -use jsonrpsee::http_server::middleware::proxy_request::ProxyGetRequestLayer; +use jsonrpsee::http_server::middleware::proxy_get_request::ProxyGetRequestLayer; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule}; #[tokio::main] diff --git a/http-server/src/middleware/mod.rs b/http-server/src/middleware/mod.rs index c2edc70a96..d1a829e423 100644 --- a/http-server/src/middleware/mod.rs +++ b/http-server/src/middleware/mod.rs @@ -1,4 +1,4 @@ //! Various middleware implementations for RPC specific purposes. /// Proxy `GET /path` to internal RPC methods. -pub mod proxy_request; +pub mod proxy_get_request; diff --git a/http-server/src/middleware/proxy_request.rs b/http-server/src/middleware/proxy_get_request.rs similarity index 100% rename from http-server/src/middleware/proxy_request.rs rename to http-server/src/middleware/proxy_get_request.rs diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index c3a47d9f79..065c6d07cb 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -30,7 +30,7 @@ use std::time::Duration; use futures::{SinkExt, StreamExt}; use jsonrpsee::core::error::SubscriptionClosed; use jsonrpsee::core::server::access_control::{AccessControl, AccessControlBuilder}; -use jsonrpsee::http_server::middleware::proxy_request::ProxyGetRequestLayer; +use jsonrpsee::http_server::middleware::proxy_get_request::ProxyGetRequestLayer; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle}; use jsonrpsee::types::error::{ErrorObject, SUBSCRIPTION_CLOSED_WITH_ERROR}; use jsonrpsee::ws_server::{WsServerBuilder, WsServerHandle};