-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix HTTP/2: retry requests rejected with
REFUSED_STREAM
(#2081)
- Loading branch information
1 parent
837a58e
commit 4ab5fb0
Showing
6 changed files
with
226 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
#![cfg(not(target_arch = "wasm32"))] | ||
use std::convert::Infallible; | ||
use std::future::Future; | ||
use std::net; | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
|
||
use futures_util::FutureExt; | ||
use http::{Request, Response}; | ||
use hyper::service::service_fn; | ||
use hyper::Body; | ||
use tokio::net::TcpListener; | ||
use tokio::select; | ||
use tokio::sync::oneshot; | ||
|
||
/// This server, unlike [`super::server::Server`], allows for delaying the | ||
/// specified amount of time after each TCP connection is established. This is | ||
/// useful for testing the behavior of the client when the server is slow. | ||
/// | ||
/// For example, in case of HTTP/2, once the TCP/TLS connection is established, | ||
/// both endpoints are supposed to send a preface and an initial `SETTINGS` | ||
/// frame (See [RFC9113 3.4] for details). What if these frames are delayed for | ||
/// whatever reason? This server allows for testing such scenarios. | ||
/// | ||
/// [RFC9113 3.4]: https://www.rfc-editor.org/rfc/rfc9113.html#name-http-2-connection-preface | ||
pub struct Server { | ||
addr: net::SocketAddr, | ||
shutdown_tx: Option<oneshot::Sender<()>>, | ||
server_terminated_rx: oneshot::Receiver<()>, | ||
} | ||
|
||
impl Server { | ||
pub async fn new<F1, Fut, F2>(func: F1, apply_config: F2, delay: Duration) -> Self | ||
where | ||
F1: Fn(Request<Body>) -> Fut + Clone + Send + 'static, | ||
Fut: Future<Output = Response<Body>> + Send + 'static, | ||
F2: FnOnce(hyper::server::conn::Http) -> hyper::server::conn::Http + Send + 'static, | ||
{ | ||
let (shutdown_tx, shutdown_rx) = oneshot::channel(); | ||
let (server_terminated_tx, server_terminated_rx) = oneshot::channel(); | ||
|
||
let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||
let addr = tcp_listener.local_addr().unwrap(); | ||
|
||
tokio::spawn(async move { | ||
let http = Arc::new(apply_config(hyper::server::conn::Http::new())); | ||
|
||
tokio::spawn(async move { | ||
let (connection_shutdown_tx, connection_shutdown_rx) = oneshot::channel(); | ||
let connection_shutdown_rx = connection_shutdown_rx.shared(); | ||
let mut shutdown_rx = std::pin::pin!(shutdown_rx); | ||
|
||
let mut handles = Vec::new(); | ||
loop { | ||
select! { | ||
_ = shutdown_rx.as_mut() => { | ||
connection_shutdown_tx.send(()).unwrap(); | ||
break; | ||
} | ||
res = tcp_listener.accept() => { | ||
let (stream, _) = res.unwrap(); | ||
|
||
|
||
let handle = tokio::spawn({ | ||
let connection_shutdown_rx = connection_shutdown_rx.clone(); | ||
let http = http.clone(); | ||
let func = func.clone(); | ||
|
||
async move { | ||
tokio::time::sleep(delay).await; | ||
|
||
let mut conn = std::pin::pin!(http.serve_connection( | ||
stream, | ||
service_fn(move |req| { | ||
let fut = func(req); | ||
async move { | ||
Ok::<_, Infallible>(fut.await) | ||
}}) | ||
)); | ||
|
||
select! { | ||
_ = conn.as_mut() => {} | ||
_ = connection_shutdown_rx => { | ||
conn.as_mut().graceful_shutdown(); | ||
conn.await.unwrap(); | ||
} | ||
} | ||
} | ||
}); | ||
|
||
handles.push(handle); | ||
} | ||
} | ||
} | ||
|
||
futures_util::future::join_all(handles).await; | ||
server_terminated_tx.send(()).unwrap(); | ||
}); | ||
}); | ||
|
||
Self { | ||
addr, | ||
shutdown_tx: Some(shutdown_tx), | ||
server_terminated_rx, | ||
} | ||
} | ||
|
||
pub async fn shutdown(mut self) { | ||
if let Some(tx) = self.shutdown_tx.take() { | ||
let _ = tx.send(()); | ||
} | ||
|
||
self.server_terminated_rx.await.unwrap(); | ||
} | ||
|
||
pub fn addr(&self) -> net::SocketAddr { | ||
self.addr | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
pub mod delay_server; | ||
pub mod server; | ||
|
||
// TODO: remove once done converting to new support server? | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters