diff --git a/src/reference/asciidoc/reactive-streams.adoc b/src/reference/asciidoc/reactive-streams.adoc index b09514b419e..d8de1accc5a 100644 --- a/src/reference/asciidoc/reactive-streams.adoc +++ b/src/reference/asciidoc/reactive-streams.adoc @@ -160,12 +160,147 @@ When the target protocol for integration provides a Reactive Streams solution, i An inbound, event-driven channel adapter implementation is about wrapping a request (if necessary) into a deferred `Mono` or `Flux` and perform a send (and produce reply, if any) only when a protocol component initiates a subscription into a `Mono` returned from the listener method. This way we have a reactive stream solution encapsulated exactly in this component. Of course, downstream integration flow subscribed on the output channel should honor Reactive Streams specification and be performed in the on demand, back-pressure ready manner. -This is not always available by the nature (or the current implementation) of `MessageHandler` processor used in the integration flow. + +This is not always available by the nature (or with the current implementation) of `MessageHandler` processor used in the integration flow. This limitation can be handled using thread pools and queues or `FluxMessageChannel` (see above) before and after integration endpoints when there is no reactive implementation. -A reactive outbound channel adapter implementation is about initiation (or continuation) of a reactive stream to interaction with an external system according provided reactive API for the target protocol. -An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of reactive stream on top. -A returned reactive type can be subscribed immediately if we are in one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics. +An example for a reactive **event-driven** inbound channel adapter: +```java +public class CustomReactiveMessageProducer extends MessageProducerSupport { + + private final CustomReactiveSource customReactiveSource; + + public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) { + Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null"); + this.customReactiveSource = customReactiveSource; + } + + @Override + protected void doStart() { + Flux> messageFlux = + this.customReactiveSource + .map(event - > + MessageBuilder + .withPayload(event.getBody()) + .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName()) + .build()); + + subscribeToPublisher(messageFlux); + } +} +``` + +Usage would look like: + +```java +public class MainFlow { + @Autowired + private CustomReactiveMessageProducer customReactiveMessageProducer; + + @Bean + public IntegrationFlow buildFlow() { + return IntegrationFlows.from(customReactiveMessageProducer) + .channel(outputChannel) + .get(); + } +} +``` +Or in a declarative way: + +```java +public class MainFlow { + @Bean + public IntegrationFlow buildFlow() { + return IntegrationFlows.from(new CustomReactiveMessageProducer(new CustomReactiveSource())) + .handle(outputChannel) + .get(); + } +} +``` +Or even without a channel adapter, we can always use the Java DSL in the following way: +```java +public class MainFlow { + @Bean + public IntegrationFlow buildFlow() { + Flux> myFlux = this.customReactiveSource + .map(event - > + MessageBuilder + .withPayload(event.getBody()) + .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName()) + .build()); + return IntegrationFlows.from(myFlux) + .handle(outputChannel) + .get(); + } +} +``` + +A reactive outbound channel adapter implementation is about the initiation (or continuation) of a reactive stream to interaction with an external system according to the provided reactive API for the target protocol. +An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of the reactive stream on top. +A returned reactive type can be subscribed immediately if we are in a one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics. + +An example for a reactive outbound channel adapter: +```java +public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler { + + private final CustomEntityOperations customEntityOperations; + + public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) { + Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null"); + this.customEntityOperations = customEntityOperations; + } + + @Override + protected Mono handleMessageInternal(Message message) { + return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class)) + .flatMap(mode -> { + switch (mode) { + case INSERT: + return handleInsert(message); + case UPDATE: + return handleUpdate(message); + default: + return Mono.error(new IllegalArgumentException()); + } + }).then(); + } + + private Mono handleInsert(Message message) { + return this.customEntityOperations.insert(message.getPayload()) + .then(); + } + + private Mono handleUpdate(Message message) { + return this.r2dbcEntityOperations.update(message.getPayload()) + .then(); + } + + public enum Type { + INSERT, + UPDATE, + } +} +``` + +We will be able to use both of the channel adatpers: +```java +public class MainFlow { + @Autowired + private CustomReactiveMessageProducer customReactiveMessageProducer; + + @Autowired + private CustomReactiveMessageHandler customReactiveMessageHandler; + + @Bean + public IntegrationFlow buildFlow() { + return IntegrationFlows.from(customReactiveMessageProducer) + .transform(someOperation) + .handle(customReactiveMessageHandler) + .get(); + } +} +``` + Currently Spring Integration provides channel adapter (or gateway) implementations for <<./webflux.adoc#webflux,WebFlux>>, <<./rsocket.adoc#rsocket,RSocket>>, <<./mongodb.adoc#mongodb,MongoDb>> and <<./r2dbc.adoc#r2dbc,R2DBC>>. The <<./redis.adoc#redis-stream-outbound,Redis Stream Channel Adapters>> are also reactive and uses `ReactiveStreamOperations` from Spring Data.