Skip to content

Commit

Permalink
spring-projectsGH-2340: Fix WebFluxMessageHandlerSpec
Browse files Browse the repository at this point in the history
Fixes spring-projects#2340

* Make `WebFluxMessageHandlerSpec` to support method chain
* Add `WebFluxDslTests.testWebFluxFlowWithReplyPayloadToFlux()`
  • Loading branch information
Abhijit Sarkar authored and artembilan committed Jan 23, 2018
1 parent 01b0e1c commit 56ffed0
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
*
* @author Shiliang Li
* @author Artem Bilan
* @author Abhijit Sarkar
*
* @since 5.0
*
Expand Down Expand Up @@ -64,22 +65,26 @@ public class WebFluxMessageHandlerSpec
* Defaults to {@code false} - simple value is pushed downstream.
* Makes sense when {@code expectedResponseType} is configured.
* @param replyPayloadToFlux represent reply payload as a {@link Flux} or as a value from the {@link Mono}.
* @return the spec
* @since 5.0.1
* @see WebFluxRequestExecutingMessageHandler#setReplyPayloadToFlux(boolean)
*/
public void replyPayloadToFlux(boolean replyPayloadToFlux) {
public WebFluxMessageHandlerSpec replyPayloadToFlux(boolean replyPayloadToFlux) {
this.target.setReplyPayloadToFlux(replyPayloadToFlux);
return this;
}

/**
* Specify a {@link BodyExtractor} as an alternative to the {@code expectedResponseType}
* to allow to get low-level access to the received {@link ClientHttpResponse}.
* @param bodyExtractor the {@link BodyExtractor} to use.
* @return the spec
* @since 5.0.1
* @see WebFluxRequestExecutingMessageHandler#setBodyExtractor(BodyExtractor)
*/
public void bodyExtractor(BodyExtractor<?, ClientHttpResponse> bodyExtractor) {
public WebFluxMessageHandlerSpec bodyExtractor(BodyExtractor<?, ClientHttpResponse> bodyExtractor) {
this.target.setBodyExtractor(bodyExtractor);
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,33 @@

import java.util.Collections;

import javax.annotation.Resource;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.reactivestreams.Publisher;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.http.dsl.Http;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.security.access.AccessDecisionManager;
import org.springframework.security.access.vote.AffirmativeBased;
Expand Down Expand Up @@ -76,6 +83,7 @@
/**
* @author Artem Bilan
* @author Shiliang Li
* @author Abhijit Sarkar
*
* @since 5.0
*/
Expand All @@ -88,7 +96,15 @@ public class WebFluxDslTests {
private WebApplicationContext wac;

@Autowired
private WebFluxRequestExecutingMessageHandler serviceInternalReactiveGatewayHandler;
@Qualifier("webFluxWithReplyPayloadToFlux.handler")
private WebFluxRequestExecutingMessageHandler webFluxWithReplyPayloadToFlux;

@Resource(name = "org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler#1")
private WebFluxRequestExecutingMessageHandler httpReactiveProxyFlow;

@Autowired
@Qualifier("webFluxFlowWithReplyPayloadToFlux.input")
private MessageChannel webFluxFlowWithReplyPayloadToFluxInput;

private MockMvc mockMvc;

Expand All @@ -106,6 +122,48 @@ public void setup() {
.build();
}

@Test
public void testWebFluxFlowWithReplyPayloadToFlux() {
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);

DataBufferFactory bufferFactory = response.bufferFactory();
return response.writeWith(
Flux.just(bufferFactory.wrap("FOO".getBytes()),
bufferFactory.wrap("BAR".getBytes())))
.then(Mono.defer(response::setComplete));
});

WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.build();

new DirectFieldAccessor(this.webFluxWithReplyPayloadToFlux)
.setPropertyValue("webClient", webClient);

QueueChannel replyChannel = new QueueChannel();

Message<String> testMessage =
MessageBuilder.withPayload("test")
.setReplyChannel(replyChannel)
.build();

this.webFluxFlowWithReplyPayloadToFluxInput.send(testMessage);

Message<?> receive = replyChannel.receive(10_000);

assertNotNull(receive);
assertThat(receive.getPayload(), instanceOf(Flux.class));

@SuppressWarnings("unchecked")
Flux<String> response = (Flux<String>) receive.getPayload();

StepVerifier.create(response)
.expectNext("FOO", "BAR")
.verifyComplete();
}

@Test
public void testHttpReactiveProxyFlow() throws Exception {
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
Expand All @@ -120,7 +178,7 @@ public void testHttpReactiveProxyFlow() throws Exception {
.clientConnector(httpConnector)
.build();

new DirectFieldAccessor(this.serviceInternalReactiveGatewayHandler)
new DirectFieldAccessor(this.httpReactiveProxyFlow)
.setPropertyValue("webClient", webClient);

this.mockMvc.perform(
Expand Down Expand Up @@ -200,6 +258,16 @@ protected void configure(HttpSecurity http) throws Exception {
.anonymous().disable();
}

@Bean
public IntegrationFlow webFluxFlowWithReplyPayloadToFlux() {
return f -> f
.handle(WebFlux.outboundGateway("http://www.springsource.org/spring-integration")
.httpMethod(HttpMethod.GET)
.replyPayloadToFlux(true)
.expectedResponseType(String.class),
e -> e.id("webFluxWithReplyPayloadToFlux"));
}

@Bean
public IntegrationFlow httpReactiveProxyFlow() {
return IntegrationFlows
Expand Down

0 comments on commit 56ffed0

Please sign in to comment.