Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

middleware: Implement proxy URI paths to RPC methods #859

Merged
merged 12 commits into from
Aug 24, 2022
Merged
Prev Previous commit
Next Next commit
middleware: Modify the response for proxies
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv committed Aug 22, 2022
commit e0c1e5d175dd9ff5cedc73119e4224c89515c53d
1 change: 1 addition & 0 deletions http-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde_json = { version = "1.0", features = ["raw_value"] }
serde = "1"
tokio = { version = "1.16", features = ["rt-multi-thread", "macros"] }
tower = "0.4.13"
pin-project-lite = "0.2.9"
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
Expand Down
81 changes: 78 additions & 3 deletions http-server/src/middlewares/proxy_request.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
//! Middleware that proxies requests at a specified URI to internal
lexnv marked this conversation as resolved.
Show resolved Hide resolved
//! RPC method calls.

use crate::response;
use futures_util::ready;
use hyper::body::HttpBody;
use hyper::header::{ACCEPT, CONTENT_TYPE};
use hyper::http::HeaderValue;
use hyper::{Body, Method, Request, Response};
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::{Layer, Service};

Expand Down Expand Up @@ -63,11 +69,12 @@ impl<S> ProxyRequest<S> {

impl<S> Service<Request<Body>> for ProxyRequest<S>
Copy link
Member

@niklasad1 niklasad1 Aug 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it's also possible for third parties to construct their own middleware service with specific logic such as #811?

i.e, to parse params from the URI and pass the parsed params call the method with the parsed params instead of params in the JSON-RPC call?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't see why not!

A ProxyPostRequest middleware could exist for instance which takes args in the body, or this one could decode named params from querystring or whatever else :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, users can create a new layer extract the params from the request's URI for sure, and pass it down similar to how we do here! 😄

where
S: Service<Request<Body>, Response = Response<Body>, Error = hyper::Error>,
S: Service<Request<Body>, Response = Response<Body>>,
<S as Service<Request<Body>>>::Error: From<hyper::Error>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
type Future = ResponseFuture<S::Future>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand All @@ -94,6 +101,74 @@ where
req = req.map(|_| body);
}

self.inner.call(req)
// Depending on `modify` adjust the response.
ResponseFuture::PollFuture { future: self.inner.call(req), modify }
lexnv marked this conversation as resolved.
Show resolved Hide resolved
}
}

pin_project! {
/// Response future for [`ProxyRequest`].
#[project = ResponseFutureState]
#[allow(missing_docs)]
pub enum ResponseFuture<F> {
/// Poll the response out of the future.
PollFuture {
#[pin]
future: F,
modify: bool,
},
/// Poll the [`hyper::Body`] response and modify it.
PollBodyData {
body: Body,
body_bytes: Vec<u8>,
},
}
}

impl<F, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<Response<Body>, E>>,
E: From<hyper::Error>,
{
type Output = F::Output;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// The purpose of this loop is to optimise the transition from
// `PollFuture` -> `PollBodyData` state, that would otherwise
// require a `cx.wake().wake_by_ref and return Poll::Pending`.
loop {
match self.as_mut().project() {
ResponseFutureState::PollFuture { future, modify } => {
let res: Response<Body> = ready!(future.poll(cx)?);

// Nothing to modify: return the response as is.
if !*modify {
return Poll::Ready(Ok(res));
}

let inner = ResponseFuture::PollBodyData { body: res.into_body(), body_bytes: Vec::new() };
self.set(inner);
}
ResponseFutureState::PollBodyData { body, body_bytes } => {
while let Some(chunk) = ready!(Pin::new(&mut *body).poll_data(cx)?) {
body_bytes.extend_from_slice(chunk.as_ref());
}

#[derive(serde::Deserialize, Debug)]
struct RpcPayload<'a> {
#[serde(borrow)]
result: &'a serde_json::value::RawValue,
}

let response = if let Ok(payload) = serde_json::from_slice::<RpcPayload>(body_bytes) {
response::ok_response(payload.result.to_string())
} else {
response::internal_error()
};

return Poll::Ready(Ok(response));
}
}
}
}
}