Skip to content

Commit

Permalink
INT-4350: Handle ResponseEntity in HTTP Inbounds
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-4350

Spring MVC and Spring WebFlux handles `ResponseEntity` via appropriate
`ReturnValue` handlers.
This way all the headers and status code are fully up to end-user.
The body is handled by the appropriate converter/writer as before

* Add `ResponseEntity` handling to the `HttpRequestHandlingMessagingGateway`
and `WebFluxInboundEndpoint`.
The logic mostly compy/pasted from the `ResponseEntityResultHandler`
  • Loading branch information
artembilan authored and garyrussell committed Oct 30, 2017
1 parent 008a740 commit 6d96914
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.http.server.ServletServerHttpResponse;
Expand Down Expand Up @@ -114,7 +116,27 @@ public final void handleRequest(HttpServletRequest servletRequest, HttpServletRe
response.setStatusCode((HttpStatus) responseContent);
}
else {
this.writeResponse(responseContent, response, request.getHeaders().getAccept());
if (responseContent instanceof ResponseEntity) {
ResponseEntity<?> responseEntity = (ResponseEntity<?>) responseContent;
responseContent = responseEntity.getBody();
response.setStatusCode(responseEntity.getStatusCode());

HttpHeaders outputHeaders = response.getHeaders();
HttpHeaders entityHeaders = responseEntity.getHeaders();

if (!entityHeaders.isEmpty()) {
entityHeaders.entrySet().stream()
.filter(entry -> !outputHeaders.containsKey(entry.getKey()))
.forEach(entry -> outputHeaders.put(entry.getKey(), entry.getValue()));
}
}

if (responseContent != null) {
writeResponse(responseContent, response, request.getHeaders().getAccept());
}
else {
response.flush();
}
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.isNull;

Expand Down Expand Up @@ -144,8 +143,8 @@ public void testHttpProxyScenario() throws Exception {

this.handlerAdapter.handle(request, response, handler);

assertNull(response.getHeaderValue("If-Modified-Since"));
assertNull(response.getHeaderValue("If-Unmodified-Since"));
assertEquals(ifModifiedSinceValue, response.getHeaderValue("If-Modified-Since"));
assertEquals(ifUnmodifiedSinceValue, response.getHeaderValue("If-Unmodified-Since"));
assertEquals("close", response.getHeaderValue("Connection"));
assertEquals(contentDispositionValue, response.getHeader("Content-Disposition"));
assertEquals("text/plain", response.getContentType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,15 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.http.HttpHeaders;
import org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler;
import org.springframework.integration.security.channel.ChannelSecurityInterceptor;
import org.springframework.integration.security.channel.SecuredChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.security.access.AccessDecisionManager;
import org.springframework.security.access.vote.AffirmativeBased;
import org.springframework.security.access.vote.RoleVoter;
Expand Down Expand Up @@ -186,9 +185,7 @@ public IntegrationFlow httpProxyErrorFlow() {
return f -> f
.transform(Throwable::getCause)
.<HttpClientErrorException>handle((p, h) ->
MessageBuilder.withPayload(p.getResponseBodyAsString())
.setHeader(HttpHeaders.STATUS_CODE, p.getStatusCode())
.build());
new ResponseEntity<>(p.getResponseBodyAsString(), p.getStatusCode()));
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package org.springframework.integration.webflux.inbound;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashSet;
Expand All @@ -35,8 +37,10 @@
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.codec.ServerCodecConfigurer;
Expand Down Expand Up @@ -75,6 +79,8 @@ public class WebFluxInboundEndpoint extends BaseHttpInboundEndpoint implements W

private static final MediaType MEDIA_TYPE_APPLICATION_ALL = new MediaType("application");

private static final List<HttpMethod> SAFE_METHODS = Arrays.asList(HttpMethod.GET, HttpMethod.HEAD);

private ServerCodecConfigurer codecConfigurer = ServerCodecConfigurer.create();

private RequestedContentTypeResolver requestedContentTypeResolver = new HeaderContentTypeResolver();
Expand Down Expand Up @@ -318,12 +324,44 @@ private Mono<Void> populateResponse(ServerWebExchange exchange, Message<?> reply
return response.setComplete();
}
else {
HttpStatus httpStatus = resolveHttpStatusFromHeaders(replyMessage.getHeaders());
final HttpStatus httpStatus = resolveHttpStatusFromHeaders(replyMessage.getHeaders());
if (httpStatus != null) {
response.setStatusCode(httpStatus);
}

return writeResponseBody(exchange, responseContent);
if (responseContent instanceof ResponseEntity) {
return Mono.just((ResponseEntity<?>) responseContent)
.flatMap(e -> {
if (httpStatus == null) {
exchange.getResponse().setStatusCode(e.getStatusCode());
}

HttpHeaders entityHeaders = e.getHeaders();
HttpHeaders responseHeaders = exchange.getResponse().getHeaders();

if (!entityHeaders.isEmpty()) {
entityHeaders.entrySet().stream()
.filter(entry -> !responseHeaders.containsKey(entry.getKey()))
.forEach(entry -> responseHeaders.put(entry.getKey(), entry.getValue()));
}

if (e.getBody() == null) {
return exchange.getResponse().setComplete();
}

String etag = entityHeaders.getETag();
Instant lastModified = Instant.ofEpochMilli(entityHeaders.getLastModified());
HttpMethod httpMethod = exchange.getRequest().getMethod();
if (SAFE_METHODS.contains(httpMethod) && exchange.checkNotModified(etag, lastModified)) {
return exchange.getResponse().setComplete();
}

return writeResponseBody(exchange, e.getBody());
});
}
else {
return writeResponseBody(exchange, responseContent);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.config.EnableIntegration;
Expand Down Expand Up @@ -83,6 +84,17 @@ public void testJsonResult() {
.jsonPath("$[2].name").isEqualTo("John");
}

@Test
public void testServerInternalErrorRequest() {
this.webTestClient
.get()
.uri("/error")
.accept(MediaType.TEXT_PLAIN)
.exchange()
.expectStatus()
.is5xxServerError();
}

@Configuration
@EnableWebFlux
@EnableIntegration
Expand Down Expand Up @@ -128,6 +140,22 @@ Flux<Person> getPersons() {
return Flux.just(new Person("Jane"), new Person("Jason"), new Person("John"));
}


@Bean
public WebFluxInboundEndpoint errorInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/error");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("errorServiceChannel");
return endpoint;
}

@ServiceActivator(inputChannel = "errorServiceChannel")
public ResponseEntity<String> processHttpRequest() {
return new ResponseEntity<>("<500 Internal Server Error,{}>", HttpStatus.INTERNAL_SERVER_ERROR);
}

}


Expand Down

0 comments on commit 6d96914

Please sign in to comment.