Skip to content

Commit

Permalink
retry: allow retrying requests without content-length headers (#1341)
Browse files Browse the repository at this point in the history
Currently, the proxy will retry requests with bodies if and only if they
have a `content-length` header with a value <= 64 KB. If the request has
a body but no `content-length` header, we currently assume that its body
will exceed the maximum buffered size, and skip trying to retry it.
However, this means gRPC requests will never be retried, because it
turns out gRPC requests don't include a `content-length` header (see
linkerd/linkerd2#7141). Whoops!

This PR fixes this by changing the retry logic to use `Body::size_hint` to
determine if buffering should be attempted. This value will be set from
`content-length` when it is set and may be set in additional situations
where the body length is known before the request is processed.

We are still protected against unbounded buffering because the buffering
body will stop buffering and discard any previously buffered data if the
buffered length ever exceeds the maximum.

Co-authored-by: Oliver Gould <ver@buoyant.io>
(cherry picked from commit bfc1e2c)
Signed-off-by: Oliver Gould <ver@buoyant.io>
  • Loading branch information
hawkw and olix0r committed Mar 30, 2022
1 parent f7ef27f commit fb470b3
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 106 deletions.
11 changes: 11 additions & 0 deletions hyper-balance/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ where

this.body.poll_trailers(cx)
}

#[inline]
fn size_hint(&self) -> hyper::body::SizeHint {
self.body.size_hint()
}
}

// ==== PendingUntilEosBody ====
Expand All @@ -147,6 +152,7 @@ impl<T: Send + 'static, B: HttpBody> HttpBody for PendingUntilEosBody<T, B> {
type Data = B::Data;
type Error = B::Error;

#[inline]
fn is_end_stream(&self) -> bool {
self.body.is_end_stream()
}
Expand Down Expand Up @@ -181,6 +187,11 @@ impl<T: Send + 'static, B: HttpBody> HttpBody for PendingUntilEosBody<T, B> {

Poll::Ready(ret)
}

#[inline]
fn size_hint(&self) -> hyper::body::SizeHint {
self.body.size_hint()
}
}

#[cfg(test)]
Expand Down
91 changes: 38 additions & 53 deletions linkerd/app/core/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,55 +58,33 @@ impl retry::NewPolicy<Route> for NewRetryPolicy {

// === impl Retry ===

impl RetryPolicy {
fn can_retry<A: http_body::Body>(&self, req: &http::Request<A>) -> bool {
let content_length = |req: &http::Request<_>| {
req.headers()
.get(http::header::CONTENT_LENGTH)
.and_then(|value| value.to_str().ok()?.parse::<usize>().ok())
};

// Requests without bodies can always be retried, as we will not need to
// buffer the body. If the request *does* have a body, retry it if and
// only if the request contains a `content-length` header and the
// content length is >= 64 kb.
let has_body = !req.body().is_end_stream();
if has_body && content_length(req).unwrap_or(usize::MAX) > MAX_BUFFERED_BYTES {
tracing::trace!(
req.has_body = has_body,
req.content_length = ?content_length(req),
"not retryable",
);
return false;
}

tracing::trace!(
req.has_body = has_body,
req.content_length = ?content_length(req),
"retryable",
);
true
}
}

impl<A, B, E> retry::Policy<http::Request<A>, http::Response<B>, E> for RetryPolicy
impl<A, B, E> retry::Policy<http::Request<ReplayBody<A>>, http::Response<B>, E> for RetryPolicy
where
A: http_body::Body + Clone,
A: http_body::Body + Unpin,
A::Error: Into<Error>,
{
type Future = future::Ready<Self>;

fn retry(
&self,
req: &http::Request<A>,
req: &http::Request<ReplayBody<A>>,
result: Result<&http::Response<B>, &E>,
) -> Option<Self::Future> {
let retryable = match result {
Err(_) => false,
Ok(rsp) => classify::Request::from(self.response_classes.clone())
.classify(req)
.start(rsp)
.eos(None)
.is_failure(),
Ok(rsp) => {
// is the request a failure?
let is_failure = classify::Request::from(self.response_classes.clone())
.classify(req)
.start(rsp)
.eos(None)
.is_failure();
// did the body exceed the maximum length limit?
let exceeded_max_len = req.body().is_capped();
let retryable = is_failure && !exceeded_max_len;
tracing::trace!(is_failure, exceeded_max_len, retryable);
retryable
}
};

if !retryable {
Expand All @@ -123,16 +101,12 @@ where
Some(future::ready(self.clone()))
}

fn clone_request(&self, req: &http::Request<A>) -> Option<http::Request<A>> {
let can_retry = self.can_retry(req);
debug_assert!(
can_retry,
"The retry policy attempted to clone an un-retryable request. This is unexpected."
);
if !can_retry {
return None;
}

fn clone_request(
&self,
req: &http::Request<ReplayBody<A>>,
) -> Option<http::Request<ReplayBody<A>>> {
// Since the body is already wrapped in a ReplayBody, it must not be obviously too large to
// buffer/clone.
let mut clone = http::Request::new(req.body().clone());
*clone.method_mut() = req.method().clone();
*clone.uri_mut() = req.uri().clone();
Expand Down Expand Up @@ -160,9 +134,20 @@ where
&self,
req: http::Request<A>,
) -> Either<Self::RetryRequest, http::Request<A>> {
if self.can_retry(&req) {
return Either::A(req.map(|body| ReplayBody::new(body, MAX_BUFFERED_BYTES)));
}
Either::B(req)
let (head, body) = req.into_parts();
let replay_body = match ReplayBody::try_new(body, MAX_BUFFERED_BYTES) {
Ok(body) => body,
Err(body) => {
tracing::debug!(
size = body.size_hint().lower(),
"Body is too large to buffer"
);
return Either::B(http::Request::from_parts(head, body));
}
};

// The body may still be too large to be buffered if the body's length was not known.
// `ReplayBody` handles this gracefully.
Either::A(http::Request::from_parts(head, replay_body))
}
}
52 changes: 48 additions & 4 deletions linkerd/app/integration/src/tests/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ macro_rules! profile_test {
async move {
// Read the entire body before responding, so that the
// client doesn't fail when writing it out.
let _body = hyper::body::aggregate(req.into_body()).await;
let _body = hyper::body::to_bytes(req.into_body()).await;
tracing::debug!(body = ?_body.as_ref().map(|body| body.len()), "recieved body");
Ok::<_, Error>(if fail {
Response::builder()
.status(533)
Expand Down Expand Up @@ -222,6 +223,36 @@ async fn retry_with_small_put_body() {
}
}

#[tokio::test]
async fn retry_without_content_length() {
profile_test! {
routes: [
controller::route()
.request_any()
.response_failure(500..600)
.retryable(true)
],
budget: Some(controller::retry_budget(Duration::from_secs(10), 0.1, 1)),
with_client: |client: client::Client| async move {
let (mut tx, body) = hyper::body::Body::channel();
let req = client.request_builder("/0.5")
.method("POST")
.body(body)
.unwrap();
let res = tokio::spawn(async move { client.request_body(req).await });
tx.send_data(Bytes::from_static(b"hello"))
.await
.expect("the whole body should be read");
tx.send_data(Bytes::from_static(b"world"))
.await
.expect("the whole body should be read");
drop(tx);
let res = res.await.unwrap();
assert_eq!(res.status(), 200);
}
}
}

#[tokio::test]
async fn does_not_retry_if_request_does_not_match() {
profile_test! {
Expand Down Expand Up @@ -280,7 +311,9 @@ async fn does_not_retry_if_body_is_too_long() {
}

#[tokio::test]
async fn does_not_retry_if_body_lacks_known_length() {
async fn does_not_retry_if_streaming_body_exceeds_max_length() {
// TODO(eliza): if we make the max length limit configurable, update this
// test to test the configurable max length limit...
profile_test! {
routes: [
controller::route()
Expand All @@ -296,10 +329,21 @@ async fn does_not_retry_if_body_lacks_known_length() {
.body(body)
.unwrap();
let res = tokio::spawn(async move { client.request_body(req).await });
let _ = tx.send_data(Bytes::from_static(b"hello"));
let _ = tx.send_data(Bytes::from_static(b"world"));
// send a 32k chunk
tx.send_data(Bytes::from(&[1u8; 32 * 1024][..]))
.await
.expect("the whole body should be read");
// ...and another one...
tx.send_data(Bytes::from(&[1u8; 32 * 1024][..]))
.await
.expect("the whole body should be read");
// ...and a third one (exceeding the max length limit)
tx.send_data(Bytes::from(&[1u8; 32 * 1024][..]))
.await
.expect("the whole body should be read");
drop(tx);
let res = res.await.unwrap();

assert_eq!(res.status(), 533);
}
}
Expand Down
4 changes: 1 addition & 3 deletions linkerd/app/integration/src/tests/transparency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,9 +843,7 @@ macro_rules! http1_tests {

let srv = server::http1()
.route_fn("/", |req| {
let has_body_header = req.headers().contains_key("transfer-encoding")
|| req.headers().contains_key("content-length");
let status = if has_body_header {
let status = if req.headers().contains_key(http::header::TRANSFER_ENCODING) {
StatusCode::BAD_REQUEST
} else {
StatusCode::OK
Expand Down
7 changes: 7 additions & 0 deletions linkerd/http-box/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,28 @@ impl Body for BoxBody {
type Data = Data;
type Error = Error;

#[inline]
fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

#[inline]
fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
self.as_mut().inner.as_mut().poll_data(cx)
}

#[inline]
fn poll_trailers(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
self.as_mut().inner.as_mut().poll_trailers(cx)
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
}
Expand Down Expand Up @@ -95,6 +99,7 @@ where
type Data = Data;
type Error = Error;

#[inline]
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}
Expand All @@ -111,13 +116,15 @@ where
}))
}

#[inline]
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
Poll::Ready(futures::ready!(self.project().0.poll_trailers(cx)).map_err(Into::into))
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.0.size_hint()
}
Expand Down
6 changes: 6 additions & 0 deletions linkerd/http-metrics/src/requests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ where
self.project().inner.poll_trailers(cx)
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
}
Expand Down Expand Up @@ -438,6 +439,11 @@ where

Poll::Ready(Ok(trls))
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
}
}

#[pinned_drop]
Expand Down
Loading

0 comments on commit fb470b3

Please sign in to comment.