Skip to content

Commit

Permalink
Honor the FluxSink State in the PollChPublisherAd
Browse files Browse the repository at this point in the history
The `PollableChannelPublisherAdapter` is based on the poll model of the
`FluxSink` and iterate and poll downstream `PollableChannel` until
there is an item or `n > 0`.
Having `take(6)` we end up with the cancel from the downstream
`Subscriber` after the batch is filled, but at the same time we continue
to poll the upstream source because `n` is like `Long.MAX_VALUE`.

* The proper way to interact is check for the `!sink.isCancelled()`
as well.
This way cancelled `sink` won't "steal" data from other subscribers

Fix compatibility with the latest dependencies

* Upgrade to Gradle 4.1
* Upgrade as much dependencies as possible
* Fix MongoDB module to resolve deprecation in the latest driver
* Increase receive timeout in the `ResequencerTests`
* Restore generic argument for the method reference in the `ReactiveStreamsTests`
* Fix `WebFluxInboundEndpoint` for the compatibility with the latest Reactor API
  • Loading branch information
artembilan authored and garyrussell committed Sep 6, 2017
1 parent cfab1fb commit 46de69d
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 49 deletions.
40 changes: 20 additions & 20 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ buildscript {
maven { url 'https://repo.spring.io/plugins-release' }
}
dependencies {
classpath 'io.spring.gradle:dependency-management-plugin:1.0.2.RELEASE'
classpath 'io.spring.gradle:dependency-management-plugin:1.0.3.RELEASE'
classpath 'io.spring.gradle:spring-io-plugin:0.0.8.RELEASE'
classpath 'io.spring.gradle:docbook-reference-plugin:0.3.1'
classpath 'org.asciidoctor:asciidoctor-gradle-plugin:1.5.0'
Expand Down Expand Up @@ -87,61 +87,61 @@ subprojects { subproject ->
}

ext {
activeMqVersion = '5.14.5'
activeMqVersion = '5.15.0'
aspectjVersion = '1.8.10'
apacheSshdVersion = '1.4.0'
apacheSshdVersion = '1.6.0'
boonVersion = '0.34'
commonsDbcp2Version = '2.1.1'
commonsIoVersion = '2.4'
commonsNetVersion = '3.5'
curatorVersion = '2.11.1'
derbyVersion = '10.13.1.1'
eclipseLinkVersion = '2.6.4'
ftpServerVersion = '1.0.6'
groovyVersion = '2.4.10'
ftpServerVersion = '1.1.1'
groovyVersion = '2.4.12'
guavaVersion = '20.0'
hamcrestVersion = '1.3'
hazelcastVersion = '3.8'
hibernateVersion = '5.2.10.Final'
hsqldbVersion = '2.4.0'
h2Version = '1.4.194'
jackson2Version = '2.9.0.pr4'
h2Version = '1.4.196'
jackson2Version = '2.9.0'
javaxActivationVersion = '1.1.1'
javaxMailVersion = '1.6.0-rc1'
javaxMailVersion = '1.6.0'
jedisVersion = '2.9.0'
jmsApiVersion = '2.0.1'
jpa21ApiVersion = '1.0.0.Final'
jpaApiVersion = '2.1.1'
jrubyVersion = '9.1.5.0'
jschVersion = '0.1.54'
jsonpathVersion = '2.3.0'
jsonpathVersion = '2.4.0'
junitVersion = '4.12'
jythonVersion = '2.5.3'
kryoShadedVersion = '3.0.3'
log4jVersion = '1.2.17'
mockitoVersion = '2.7.22'
mysqlVersion = '5.1.41'
mockitoVersion = '2.9.0'
mysqlVersion = '6.0.6'
pahoMqttClientVersion = '1.1.1'
postgresVersion = '42.0.0'
reactorNettyVersion = '0.7.0.BUILD-SNAPSHOT'
reactorVersion = '3.1.0.BUILD-SNAPSHOT'
romeToolsVersion = '1.7.2'
romeToolsVersion = '1.7.4'
servletApiVersion = '3.1.0'
slf4jVersion = "1.7.25"
smackVersion = '4.1.9'
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.0.0.M5'
springDataJpaVersion = '2.0.0.RC2'
springDataMongoVersion = '2.0.0.RC2'
springDataRedisVersion = '2.0.0.RC2'
springGemfireVersion = '2.0.0.RC2'
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.0.0.BUILD-SNAPSHOT'
springDataJpaVersion = '2.0.0.BUILD-SNAPSHOT'
springDataMongoVersion = '2.0.0.BUILD-SNAPSHOT'
springDataRedisVersion = '2.0.0.BUILD-SNAPSHOT'
springGemfireVersion = '2.0.0.BUILD-SNAPSHOT'
springSecurityVersion = '5.0.0.BUILD-SNAPSHOT'
springSocialTwitterVersion = '2.0.0.M4'
springSocialTwitterVersion = '2.0.0.BUILD-SNAPSHOT'
springRetryVersion = '1.2.0.RELEASE'
springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.0.0.BUILD-SNAPSHOT'
springWsVersion = '2.4.0.RELEASE'
tomcatVersion = "8.5.16"
tomcatVersion = "8.5.20"
xmlUnitVersion = '1.6'
xstreamVersion = '1.4.7'
xstreamVersion = '1.4.10'
}

eclipse {
Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Mon Jul 24 12:58:44 EDT 2017
#Wed Sep 06 12:46:29 EDT 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.0.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-bin.zip
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void subscribe(Subscriber<? super Message<T>> subscriber) {
.<Message<T>>create(sink ->
sink.onRequest(n -> {
Message<?> m;
while (n-- > 0 && (m = this.channel.receive()) != null) {
while (!sink.isCancelled() && n-- > 0 && (m = this.channel.receive()) != null) {
sink.next((Message<T>) m);
}
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,9 @@ public void testResequencingWithDiscard() throws InterruptedException {
assertNotNull(reply1);
assertNotNull(reply2);
assertNull(reply3);
ArrayList<Integer> sequence = new ArrayList<Integer>(Arrays.asList(new IntegrationMessageHeaderAccessor(reply1).getSequenceNumber(),
new IntegrationMessageHeaderAccessor(reply2).getSequenceNumber()));
ArrayList<Integer> sequence = new ArrayList<>(
Arrays.asList(new IntegrationMessageHeaderAccessor(reply1).getSequenceNumber(),
new IntegrationMessageHeaderAccessor(reply2).getSequenceNumber()));
Collections.sort(sequence);
assertEquals("[1, 2]", sequence.toString());
// Once a group is expired, late messages are discarded immediately by default
Expand Down Expand Up @@ -363,7 +364,7 @@ public void testTimeoutDefaultExpiry() throws InterruptedException {
this.resequencer.handleMessage(message2);
Message<?> out1 = replyChannel.receive(10);
assertNull(out1);
out1 = discardChannel.receive(1000);
out1 = discardChannel.receive(10000);
assertNotNull(out1);
Message<?> out2 = discardChannel.receive(10);
assertNotNull(out2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;

import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -105,7 +104,6 @@ public void testReactiveFlow() throws Exception {
}

@Test
@Ignore
public void testPollableReactiveFlow() throws Exception {
this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,7 +32,6 @@
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.integration.aop.AbstractMessageSourceAdvice;
Expand All @@ -46,12 +45,11 @@
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.mongodb.util.JSON;

/**
* @author Oleg Zhurakousky
* @author Artem Bilan
* @author Yaron Yamin
*
* @since 2.2
*/
@ContextConfiguration
Expand Down Expand Up @@ -168,6 +166,7 @@ public void testWithNamedCollection() throws Exception {
this.mongoInboundAdapterWithNamedCollection.stop();
this.replyChannel.purge(null);
}

@Test
@MongoDbAvailable
public void testWithQueryExpression() throws Exception {
Expand All @@ -181,6 +180,7 @@ public void testWithQueryExpression() throws Exception {
assertEquals("Bob", message.getPayload().get(0).getName());
this.mongoInboundAdapterWithQueryExpression.stop();
}

@Test
@MongoDbAvailable
public void testWithStringQueryExpression() throws Exception {
Expand Down Expand Up @@ -273,7 +273,7 @@ public void remove(MongoOperations mongoOperations, Object target, String collec
if (target instanceof List<?>) {
List<?> documents = (List<?>) target;
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
mongoOperations.remove(document, collectionName);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,16 +32,18 @@
import org.springframework.messaging.support.GenericMessage;

import com.mongodb.BasicDBObject;
import com.mongodb.util.JSON;

/**
* @author Oleg Zhurakousky
* @author Artem Bilan
*/
public class MongoDbOutboundChannelAdapterIntegrationTests extends MongoDbAvailableTests {

@Test
@MongoDbAvailable
public void testWithDefaultMongoFactory() throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("outbound-adapter-config.xml", this.getClass());
ClassPathXmlApplicationContext context =
new ClassPathXmlApplicationContext("outbound-adapter-config.xml", this.getClass());

MessageChannel channel = context.getBean("simpleAdapter", MessageChannel.class);
Message<Person> message = new GenericMessage<MongoDbAvailableTests.Person>(this.createPerson("Bob"));
Expand All @@ -57,10 +59,14 @@ public void testWithDefaultMongoFactory() throws Exception {
@MongoDbAvailable
public void testWithNamedCollection() throws Exception {
MongoDbFactory mongoDbFactory = this.prepareMongoFactory("foo");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("outbound-adapter-config.xml", this.getClass());
ClassPathXmlApplicationContext context =
new ClassPathXmlApplicationContext("outbound-adapter-config.xml", this.getClass());

MessageChannel channel = context.getBean("simpleAdapterWithNamedCollection", MessageChannel.class);
Message<Person> message = MessageBuilder.withPayload(this.createPerson("Bob")).setHeader("collectionName", "foo").build();
Message<Person> message =
MessageBuilder.withPayload(this.createPerson("Bob"))
.setHeader("collectionName", "foo")
.build();
channel.send(message);

MongoTemplate template = new MongoTemplate(mongoDbFactory);
Expand All @@ -72,10 +78,15 @@ public void testWithNamedCollection() throws Exception {
@MongoDbAvailable
public void testWithTemplate() throws Exception {
MongoDbFactory mongoDbFactory = this.prepareMongoFactory("foo");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("outbound-adapter-config.xml", this.getClass());
ClassPathXmlApplicationContext context =
new ClassPathXmlApplicationContext("outbound-adapter-config.xml", this.getClass());

MessageChannel channel = context.getBean("simpleAdapterWithTemplate", MessageChannel.class);
Message<Person> message = MessageBuilder.withPayload(this.createPerson("Bob")).setHeader("collectionName", "foo").build();
Message<Person> message =
MessageBuilder.withPayload(this.createPerson("Bob"))
.setHeader("collectionName", "foo")
.build();

channel.send(message);

MongoTemplate template = new MongoTemplate(mongoDbFactory);
Expand All @@ -87,13 +98,19 @@ public void testWithTemplate() throws Exception {
@MongoDbAvailable
public void testSavingDbObject() throws Exception {

BasicDBObject dbObject = (BasicDBObject) JSON.parse("{'foo' : 'bar'}");
BasicDBObject dbObject = BasicDBObject.parse("{'foo' : 'bar'}");

MongoDbFactory mongoDbFactory = this.prepareMongoFactory("foo");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("outbound-adapter-config.xml", this.getClass());
ClassPathXmlApplicationContext context =
new ClassPathXmlApplicationContext("outbound-adapter-config.xml", this.getClass());

MessageChannel channel = context.getBean("simpleAdapterWithTemplate", MessageChannel.class);
Message<BasicDBObject> message = MessageBuilder.withPayload(dbObject).setHeader("collectionName", "foo").build();

Message<BasicDBObject> message =
MessageBuilder.withPayload(dbObject)
.setHeader("collectionName", "foo")
.build();

channel.send(message);

MongoTemplate template = new MongoTemplate(mongoDbFactory);
Expand All @@ -108,10 +125,16 @@ public void testSavingJSONString() throws Exception {
String object = "{'foo' : 'bar'}";

MongoDbFactory mongoDbFactory = this.prepareMongoFactory("foo");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("outbound-adapter-config.xml", this.getClass());
ClassPathXmlApplicationContext context =
new ClassPathXmlApplicationContext("outbound-adapter-config.xml", this.getClass());

MessageChannel channel = context.getBean("simpleAdapterWithTemplate", MessageChannel.class);
Message<String> message = MessageBuilder.withPayload(object).setHeader("collectionName", "foo").build();

Message<String> message =
MessageBuilder.withPayload(object)
.setHeader("collectionName", "foo")
.build();

channel.send(message);

MongoTemplate template = new MongoTemplate(mongoDbFactory);
Expand All @@ -122,7 +145,8 @@ public void testSavingJSONString() throws Exception {
@Test
@MongoDbAvailable
public void testWithMongoConverter() throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("outbound-adapter-config.xml", this.getClass());
ClassPathXmlApplicationContext context =
new ClassPathXmlApplicationContext("outbound-adapter-config.xml", this.getClass());

MessageChannel channel = context.getBean("simpleAdapterWithConverter", MessageChannel.class);
Message<Person> message = new GenericMessage<MongoDbAvailableTests.Person>(this.createPerson("Bob"));
Expand All @@ -133,4 +157,5 @@ public void testWithMongoConverter() throws Exception {
assertNotNull(template.find(new BasicQuery("{'name' : 'Bob'}"), Person.class, "data"));
context.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@
import org.springframework.integration.mongodb.rules.MongoDbAvailable;
import org.springframework.integration.mongodb.rules.MongoDbAvailableTests;

import com.mongodb.util.JSON;
import com.mongodb.BasicDBObject;

/**
* @author Amol Nayak
* @author Oleg Zhurakousky
* @author Gary Russell
* @author Yaron Yamin
* @author Artem Bilan
*
* @since 2.2
*
Expand Down Expand Up @@ -280,7 +281,7 @@ public void validatePipelineInModifyOut() throws Exception {

MongoTemplate template = new MongoTemplate(mongoDbFactory);

template.save(JSON.parse("{'name' : 'Manny', 'id' : 1}"), "data");
template.save(BasicDBObject.parse("{'name' : 'Manny', 'id' : 1}"), "data");

Expression queryExpression = new LiteralExpression("{'name' : 'Manny'}");
MongoDbMessageSource messageSource = new MongoDbMessageSource(mongoDbFactory, queryExpression);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private Mono<Void> doHandle(ServerWebExchange exchange) {
return setStatusCode(exchange);
}
})
.doOnTerminate((e, t) -> this.activeCount.decrementAndGet());
.doOnTerminate(this.activeCount::decrementAndGet);

}

Expand Down

0 comments on commit 46de69d

Please sign in to comment.