Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: add serve and serve_with_graceful_shutdown helpers #1382

Merged
merged 5 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 3 additions & 33 deletions examples/examples/jsonrpsee_as_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,14 @@ use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use futures::future::{self, Either};
use futures::FutureExt;
use hyper::header::AUTHORIZATION;
use hyper::HeaderMap;
use hyper_util::rt::{TokioExecutor, TokioIo};
use jsonrpsee::core::async_trait;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::server::middleware::rpc::{ResponseFuture, RpcServiceBuilder, RpcServiceT};
use jsonrpsee::server::{stop_channel, ServerHandle, StopHandle, TowerServiceBuilder};
use jsonrpsee::server::{serve_with_graceful_shutdown, stop_channel, ServerHandle, StopHandle, TowerServiceBuilder};
use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, Request};
use jsonrpsee::ws_client::{HeaderValue, WsClientBuilder};
use jsonrpsee::{MethodResponse, Methods};
Expand Down Expand Up @@ -147,8 +145,6 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server(metrics: Metrics) -> anyhow::Result<ServerHandle> {
use hyper::service::service_fn;

let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 9944))).await?;

// This state is cloned for every connection
Expand Down Expand Up @@ -201,7 +197,7 @@ async fn run_server(metrics: Metrics) -> anyhow::Result<ServerHandle> {
};
let per_conn2 = per_conn.clone();

let svc = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
let svc = tower::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
let is_websocket = jsonrpsee::server::ws::is_upgrade_request(&req);
let transport_label = if is_websocket { "ws" } else { "http" };
let PerConnection { methods, stop_handle, metrics, svc_builder } = per_conn2.clone();
Expand Down Expand Up @@ -262,33 +258,7 @@ async fn run_server(metrics: Metrics) -> anyhow::Result<ServerHandle> {
}
});

let per_conn = per_conn.clone();
tokio::spawn(async move {
let stop_handle2 = per_conn.stop_handle.clone();

let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(TokioIo::new(sock), svc);
let stopped = stop_handle2.shutdown();

// Pin the future so that it can be polled.
tokio::pin!(stopped, conn);

let res = match future::select(conn, stopped).await {
// Return the connection if not stopped.
Either::Left((conn, _)) => conn,
// If the server is stopped, we should gracefully shutdown
// the connection and poll it until it finishes.
Either::Right((_, mut conn)) => {
conn.as_mut().graceful_shutdown();
conn.await
}
};

// Log any errors that might have occurred.
if let Err(err) = res {
tracing::error!(err=?err, "HTTP connection failed");
}
});
tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle.clone().shutdown()));
}
});

Expand Down
35 changes: 5 additions & 30 deletions examples/examples/jsonrpsee_server_low_level_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};

use futures::future::{self, BoxFuture, Either};
use futures::future::BoxFuture;
use futures::FutureExt;
use hyper_util::rt::{TokioExecutor, TokioIo};
use jsonrpsee::core::async_trait;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::server::middleware::rpc::RpcServiceT;
use jsonrpsee::server::{
http, stop_channel, ws, ConnectionGuard, ConnectionState, RpcServiceBuilder, ServerConfig, ServerHandle, StopHandle,
http, serve_with_graceful_shutdown, stop_channel, ws, ConnectionGuard, ConnectionState, RpcServiceBuilder,
ServerConfig, ServerHandle, StopHandle,
};
use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, Request};
use jsonrpsee::ws_client::WsClientBuilder;
Expand Down Expand Up @@ -158,8 +158,6 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<ServerHandle> {
use hyper::service::service_fn;

// Construct our SocketAddr to listen on...
let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 9944))).await?;

Expand Down Expand Up @@ -224,7 +222,7 @@ async fn run_server() -> anyhow::Result<ServerHandle> {
// Create a service handler.
let stop_handle2 = per_conn.stop_handle.clone();
let per_conn = per_conn.clone();
let svc = service_fn(move |req| {
let svc = tower::service_fn(move |req| {
let PerConnection {
methods,
stop_handle,
Expand Down Expand Up @@ -307,30 +305,7 @@ async fn run_server() -> anyhow::Result<ServerHandle> {
});

// Upgrade the connection to a HTTP service.
tokio::spawn(async move {
let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(TokioIo::new(sock), svc);
let stopped = stop_handle2.shutdown();

// Pin the future so that it can be polled.
tokio::pin!(stopped, conn);

let res = match future::select(conn, stopped).await {
// Return the connection if not stopped.
Either::Left((conn, _)) => conn,
// If the server is stopped, we should gracefully shutdown
// the connection and poll it until it finishes.
Either::Right((_, mut conn)) => {
conn.as_mut().graceful_shutdown();
conn.await
}
};

// Log any errors that might have occurred.
if let Err(err) = res {
tracing::error!(err=?err, "HTTP connection failed");
}
});
tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle2.shutdown()));
}
});

Expand Down
35 changes: 2 additions & 33 deletions examples/examples/ws_dual_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use futures::future::{self, Either};
use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::service::TowerToHyperService;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::server::{stop_channel, ServerHandle};
use jsonrpsee::server::{serve_with_graceful_shutdown, stop_channel, ServerHandle};
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::{rpc_params, RpcModule};
use std::net::SocketAddr;
Expand Down Expand Up @@ -111,36 +108,8 @@ async fn run_server() -> anyhow::Result<(ServerHandle, Addrs)> {
_ = stop_hdl.clone().shutdown() => break,
};

// Clone the service and stop handle to be moved into the spawn
// below.
let svc = svc.clone();
let stop_hdl2 = stop_hdl.clone();

// Spawn a new task to serve each respective (Hyper) connection.
tokio::spawn(async move {
let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(TokioIo::new(stream), TowerToHyperService::new(svc));
let stopped = stop_hdl2.shutdown();

// Pin the future so that it can be polled.
tokio::pin!(stopped, conn);

let res = match future::select(conn, stopped).await {
// Return the connection if not stopped.
Either::Left((conn, _)) => conn,
// If the server is stopped, we should gracefully shutdown
// the connection and poll it until it finishes.
Either::Right((_, mut conn)) => {
conn.as_mut().graceful_shutdown();
conn.await
}
};

// Log any errors that might have occurred.
if let Err(err) = res {
tracing::error!(err=?err, "HTTP connection failed");
}
});
tokio::spawn(serve_with_graceful_shutdown(stream, svc.clone(), stop_hdl.clone().shutdown()));
}
});

Expand Down
1 change: 1 addition & 0 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ pub use tracing;
pub use jsonrpsee_core::http_helpers::{Body as HttpBody, Request as HttpRequest, Response as HttpResponse};
pub use transport::http;
pub use transport::ws;
pub use utils::{serve, serve_with_graceful_shutdown};

pub(crate) const LOG_TARGET: &str = "jsonrpsee-server";
30 changes: 3 additions & 27 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
/// # Examples
///
/// ```no_run
/// use jsonrpsee_server::{Methods, ServerHandle, ws, stop_channel};
/// use jsonrpsee_server::{Methods, ServerHandle, ws, stop_channel, serve_with_graceful_shutdown};
/// use tower::Service;
/// use std::{error::Error as StdError, net::SocketAddr};
/// use futures_util::future::{self, Either};
Expand Down Expand Up @@ -820,7 +820,7 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
/// let svc_builder2 = svc_builder.clone();
/// let methods2 = methods.clone();
///
/// let svc = hyper::service::service_fn(move |req| {
/// let svc = tower::service_fn(move |req| {
/// let stop_handle = stop_handle2.clone();
/// let svc_builder = svc_builder2.clone();
/// let methods = methods2.clone();
Expand All @@ -846,32 +846,8 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
/// async move { svc.call(req).await.map_err(|e| anyhow::anyhow!("{:?}", e)) }
/// });
///
/// let stop_handle = stop_handle.clone();
/// // Upgrade the connection to a HTTP service with graceful shutdown.
/// tokio::spawn(async move {
/// let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
/// let conn = builder.serve_connection_with_upgrades(TokioIo::new(sock), svc);
/// let stopped = stop_handle.shutdown();
///
/// // Pin the future so that it can be polled.
/// tokio::pin!(stopped, conn);
///
/// let res = match future::select(conn, stopped).await {
/// // Return the connection if not stopped.
/// Either::Left((conn, _)) => conn,
/// // If the server is stopped, we should gracefully shutdown
/// // the connection and poll it until it finishes.
/// Either::Right((_, mut conn)) => {
/// conn.as_mut().graceful_shutdown();
/// conn.await
/// }
/// };
///
/// // Log any errors that might have occurred.
/// if let Err(err) = res {
/// tracing::error!(err=?err, "HTTP connection failed");
/// }
/// });
/// tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle.clone().shutdown()));
/// }
/// });
///
Expand Down
33 changes: 4 additions & 29 deletions server/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::{fmt, sync::atomic::AtomicUsize};

use crate::{stop_channel, RpcModule, Server, ServerBuilder, ServerHandle};
use crate::{serve_with_graceful_shutdown, stop_channel, RpcModule, Server, ServerBuilder, ServerHandle};

use futures_util::future::Either;
use futures_util::{future, FutureExt};
use hyper_util::rt::{TokioExecutor, TokioIo};
use futures_util::FutureExt;
use jsonrpsee_core::server::Methods;
use jsonrpsee_core::{DeserializeOwned, RpcResult, StringError};
use jsonrpsee_test_utils::TimeoutFutureExt;
Expand Down Expand Up @@ -210,8 +208,6 @@ pub(crate) struct Metrics {
}

pub(crate) async fn ws_server_with_stats(metrics: Metrics) -> SocketAddr {
use hyper::service::service_fn;

let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).await.unwrap();
let addr = listener.local_addr().unwrap();
let (stop_handle, server_handle) = stop_channel();
Expand Down Expand Up @@ -241,7 +237,7 @@ pub(crate) async fn ws_server_with_stats(metrics: Metrics) -> SocketAddr {
tokio::spawn(async move {
let rpc_svc = rpc_svc.clone();

let svc = service_fn(move |req| {
let svc = tower::service_fn(move |req| {
let is_websocket = crate::ws::is_upgrade_request(&req);
let metrics = metrics.clone();
let mut rpc_svc = rpc_svc.clone();
Expand All @@ -264,28 +260,7 @@ pub(crate) async fn ws_server_with_stats(metrics: Metrics) -> SocketAddr {
}
});

let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(TokioIo::new(sock), svc);
let stopped = stop_handle.shutdown();

// Pin the future so that it can be polled.
tokio::pin!(stopped, conn);

let res = match future::select(conn, stopped).await {
// Return the connection if not stopped.
Either::Left((conn, _)) => conn,
// If the server is stopped, we should gracefully shutdown
// the connection and poll it until it finishes.
Either::Right((_, mut conn)) => {
conn.as_mut().graceful_shutdown();
conn.await
}
};

// Log any errors that might have occurred.
if let Err(err) = res {
tracing::error!(err=?err, "HTTP connection failed");
}
tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle.clone().shutdown()));
});
}
});
Expand Down
62 changes: 62 additions & 0 deletions server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::{HttpBody, HttpRequest};

use futures_util::future::{self, Either};
use hyper_util::rt::{TokioExecutor, TokioIo};
use jsonrpsee_core::BoxError;
use pin_project::pin_project;
use tower::util::Oneshot;
use tower::ServiceExt;
Expand Down Expand Up @@ -79,6 +83,64 @@ where
}
}

/// Serve a service over a TCP connection without graceful shutdown.
/// This means that pending requests will be dropped when the server is stopped.
///
/// If you want to gracefully shutdown the server, use [`serve_with_graceful_shutdown`] instead.
pub async fn serve<S, B, I>(io: I, service: S) -> Result<(), BoxError>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would mention here that graceful shutdowns are not handled, and users should look at serve_with_graceful_shutdown in those cases or similar

where
S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
S::Future: Send,
S::Response: Send,
S::Error: Into<BoxError>,
B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
let service = hyper_util::service::TowerToHyperService::new(service);
let io = TokioIo::new(io);

let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(io, service);
conn.await
}

/// Serve a service over a TCP connection with graceful shutdown.
/// This means that pending requests will be completed before the server is stopped.
pub async fn serve_with_graceful_shutdown<S, B, I>(
io: I,
service: S,
stopped: impl Future<Output = ()>,
) -> Result<(), BoxError>
where
S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
S::Future: Send,
S::Response: Send,
S::Error: Into<BoxError>,
B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
let service = hyper_util::service::TowerToHyperService::new(service);
let io = TokioIo::new(io);

let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(io, service);

tokio::pin!(stopped, conn);

match future::select(conn, stopped).await {
// Return if the connection was completed.
Either::Left((conn, _)) => conn,
// If the server is stopped, we should gracefully shutdown
// the connection and poll it until it finishes.
Either::Right((_, mut conn)) => {
conn.as_mut().graceful_shutdown();
conn.await
}
}
}

/// Helpers to deserialize a request with extensions.
pub(crate) mod deserialize {
/// Helper to deserialize a request with extensions.
Expand Down
Loading
Loading