Skip to content

Commit

Permalink
INT-4358: Avoid blocking WebFlux response parsing
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-4358

* Parse response in the `WebFluxRequestExecutingMessageHandler`
in reactive, on demand manner.
* Add protected for empty response body in case of error
  • Loading branch information
artembilan authored and garyrussell committed Oct 25, 2017
1 parent efb8f04 commit b69bbdc
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.web.reactive.function.BodyExtractor;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
Expand Down Expand Up @@ -130,54 +128,61 @@ protected Object exchange(Supplier<URI> uriSupplier, HttpMethod httpMethod, Http
requestSpec.body(BodyInserters.fromObject(httpRequest.getBody()));
}

Mono<ClientResponse> responseMono = requestSpec.exchange()
.doOnNext(response -> {
HttpStatus httpStatus = response.statusCode();
if (httpStatus.is4xxClientError() || httpStatus.is5xxServerError()) {
throw new WebClientResponseException(
String.format("ClientResponse has erroneous status code: %d %s",
response.statusCode().value(),
response.statusCode().getReasonPhrase()),
httpStatus.value(),
httpStatus.getReasonPhrase(),
response.headers()
.asHttpHeaders(),
response.body(BodyExtractors.toDataBuffers())
Mono<ClientResponse> responseMono =
requestSpec.exchange()
.flatMap(response -> {
HttpStatus httpStatus = response.statusCode();
if (httpStatus.isError()) {
return response.body(BodyExtractors.toDataBuffers())
.reduce(DataBuffer::write)
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return bytes;
})
.block(),
response.headers()
.contentType()
.map(MimeType::getCharset)
.orElse(StandardCharsets.ISO_8859_1));
}
});
.defaultIfEmpty(new byte[0])
.map(bodyBytes -> {
throw new WebClientResponseException(
"ClientResponse has erroneous status code: "
+ httpStatus.value() + " "
+ httpStatus.getReasonPhrase(),
httpStatus.value(),
httpStatus.getReasonPhrase(),
response.headers().asHttpHeaders(),
bodyBytes,
response.headers().contentType()
.map(MimeType::getCharset)
.orElse(StandardCharsets.ISO_8859_1));
}
);
}
else {
return Mono.just(response);
}
});

if (isExpectReply()) {
BodyExtractor<? extends Mono<?>, ReactiveHttpInputMessage> bodyExtractor;

if (expectedResponseType instanceof ParameterizedTypeReference<?>) {
bodyExtractor = BodyExtractors.toMono((ParameterizedTypeReference<?>) expectedResponseType);
}
else if (expectedResponseType != null) {
bodyExtractor = BodyExtractors.toMono((Class<?>) expectedResponseType);
}
else {
bodyExtractor = null;
}

return responseMono
.map(response ->
new ResponseEntity<>(bodyExtractor != null
? response.body(bodyExtractor).block()
: null,
response.headers().asHttpHeaders(),
response.statusCode()))
.flatMap(response -> {
ResponseEntity.BodyBuilder httpEntityBuilder =
ResponseEntity.status(response.statusCode())
.headers(response.headers().asHttpHeaders());

Mono<?> bodyMono = Mono.empty();

if (expectedResponseType instanceof ParameterizedTypeReference<?>) {
bodyMono = response.body(BodyExtractors.toMono((ParameterizedTypeReference<?>) expectedResponseType));
}
else if (expectedResponseType != null) {
bodyMono = response.body(BodyExtractors.toMono((Class<?>) expectedResponseType));
}

return bodyMono
.map(httpEntityBuilder::body)
.defaultIfEmpty(httpEntityBuilder.build());
}
)
.map(this::getReply);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.springframework.integration.test.matcher.HeaderMatcher.hasHeader;

Expand All @@ -35,9 +36,11 @@
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.test.web.reactive.server.HttpHandlerConnector;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;

import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Expand All @@ -52,10 +55,11 @@ public class WebFluxRequestExecutingMessageHandlerTests {

@Test
public void testReactiveReturn() throws Throwable {
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.OK);
return Mono.defer(response::setComplete);
});
ClientHttpConnector httpConnector =
new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.OK);
return Mono.defer(response::setComplete);
});

WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
Expand All @@ -81,10 +85,11 @@ public void testReactiveReturn() throws Throwable {

@Test
public void testReactiveErrorOneWay() throws Throwable {
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.UNAUTHORIZED);
return Mono.defer(response::setComplete);
});
ClientHttpConnector httpConnector =
new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.UNAUTHORIZED);
return Mono.defer(response::setComplete);
});

WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
Expand All @@ -110,9 +115,10 @@ public void testReactiveErrorOneWay() throws Throwable {

@Test
public void testReactiveConnectErrorOneWay() throws Throwable {
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
throw new RuntimeException("Intentional connection error");
});
ClientHttpConnector httpConnector =
new HttpHandlerConnector((request, response) -> {
throw new RuntimeException("Intentional connection error");
});

WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
Expand All @@ -136,4 +142,44 @@ public void testReactiveConnectErrorOneWay() throws Throwable {
assertThat(throwable.getMessage(), containsString("Intentional connection error"));
}

@Test
public void testServiceUnavailableWithoutBody() {
ClientHttpConnector httpConnector =
new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.SERVICE_UNAVAILABLE);
return Mono.defer(response::setComplete);
});

WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.build();

String destinationUri = "http://www.springsource.org/spring-integration";
QueueChannel replyChannel = new QueueChannel();
QueueChannel errorChannel = new QueueChannel();
WebFluxRequestExecutingMessageHandler messageHandler =
new WebFluxRequestExecutingMessageHandler(destinationUri, webClient);
messageHandler.setOutputChannel(replyChannel);

Message<String> requestMessage =
MessageBuilder.withPayload("test")
.setErrorChannel(errorChannel)
.build();

messageHandler.handleMessage(requestMessage);

Message<?> errorMessage = errorChannel.receive(10000);
assertNotNull(errorMessage);

Object payload = errorMessage.getPayload();
assertThat(payload, instanceOf(MessageHandlingException.class));

Exception exception = (Exception) payload;
assertThat(exception.getCause(), instanceOf(WebClientResponseException.class));
assertThat(exception.getMessage(), containsString("503 Service Unavailable"));

Message<?> replyMessage = errorChannel.receive(10);
assertNull(replyMessage);
}

}

0 comments on commit b69bbdc

Please sign in to comment.