Skip to content

Commit

Permalink
INT-3475: Integer.MAX_VALUE / 2 Phase for MDEs
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-3475

Change the phase for `SourcePollingChannelAdapterFactoryBean`, `AbstractPollingEndpoint`, `MessageProducerSupport`, `JmsMessageDrivenEndpoint`
to the `Integer.MAX_VALUE / 2`
  • Loading branch information
Artem Bilan authored and garyrussell committed Jul 22, 2014
1 parent 5bb2141 commit 7abefb0
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void verifyIdAsChannel() {
assertEquals(DirectChannel.class, channel.getClass());
assertEquals(AmqpInboundChannelAdapter.class, adapter.getClass());
assertEquals(Boolean.TRUE, TestUtils.getPropertyValue(adapter, "autoStartup"));
assertEquals(0, TestUtils.getPropertyValue(adapter, "phase"));
assertEquals(Integer.MAX_VALUE / 2, TestUtils.getPropertyValue(adapter, "phase"));
assertTrue(TestUtils.getPropertyValue(adapter, "messageListenerContainer.missingQueuesFatal", Boolean.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@
* @author Oleg Zhurakousky
* @author Josh Long
* @author Gary Russell
* @author Artem Bilan
*/
public class ConsumerEndpointFactoryBean
implements FactoryBean<AbstractEndpoint>, BeanFactoryAware, BeanNameAware, BeanClassLoaderAware, InitializingBean, SmartLifecycle {
implements FactoryBean<AbstractEndpoint>, BeanFactoryAware, BeanNameAware, BeanClassLoaderAware,
InitializingBean, SmartLifecycle {

private volatile MessageHandler handler;

Expand All @@ -70,6 +72,8 @@ public class ConsumerEndpointFactoryBean

private volatile int phase = 0;

private volatile boolean isPhaseSet;

private volatile MessageChannel inputChannel;

private volatile ConfigurableBeanFactory beanFactory;
Expand Down Expand Up @@ -119,6 +123,7 @@ public void setAutoStartup(boolean autoStartup) {

public void setPhase(int phase) {
this.phase = phase;
this.isPhaseSet = true;
}

@Override
Expand Down Expand Up @@ -246,7 +251,8 @@ else if (channel instanceof PollableChannel) {
pollingConsumer.setErrorHandler(this.pollerMetadata.getErrorHandler());

pollingConsumer.setReceiveTimeout(this.pollerMetadata.getReceiveTimeout());
pollingConsumer.setTransactionSynchronizationFactory(this.pollerMetadata.getTransactionSynchronizationFactory());
pollingConsumer.setTransactionSynchronizationFactory(
this.pollerMetadata.getTransactionSynchronizationFactory());
pollingConsumer.setBeanClassLoader(beanClassLoader);
pollingConsumer.setBeanFactory(beanFactory);
this.endpoint = pollingConsumer;
Expand All @@ -257,7 +263,11 @@ else if (channel instanceof PollableChannel) {
this.endpoint.setBeanName(this.beanName);
this.endpoint.setBeanFactory(this.beanFactory);
this.endpoint.setAutoStartup(this.autoStartup);
this.endpoint.setPhase(this.phase);
int phase = this.phase;
if (!this.isPhaseSet && this.endpoint instanceof PollingConsumer) {
phase = Integer.MAX_VALUE / 2;
}
this.endpoint.setPhase(phase);
this.endpoint.afterPropertiesSet();
this.initialized = true;
}
Expand All @@ -270,7 +280,7 @@ else if (channel instanceof PollableChannel) {

@Override
public boolean isAutoStartup() {
return (this.endpoint != null) ? this.endpoint.isAutoStartup() : true;
return (this.endpoint == null) || this.endpoint.isAutoStartup();
}

@Override
Expand All @@ -280,7 +290,7 @@ public int getPhase() {

@Override
public boolean isRunning() {
return (this.endpoint != null) ? this.endpoint.isRunning() : false;
return (this.endpoint != null) && this.endpoint.isRunning();
}

@Override
Expand All @@ -303,4 +313,5 @@ public void stop(Runnable callback) {
this.endpoint.stop(callback);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class SourcePollingChannelAdapterFactoryBean implements FactoryBean<Sourc

private volatile boolean autoStartup = true;

private volatile int phase = 0;
private volatile int phase = Integer.MAX_VALUE / 2;

private volatile Long sendTimeout;

Expand Down Expand Up @@ -187,15 +187,15 @@ private void initializeAdapter() {
*/

public boolean isAutoStartup() {
return (this.adapter != null) ? this.adapter.isAutoStartup() : true;
return (this.adapter == null) || this.adapter.isAutoStartup();
}

public int getPhase() {
return (this.adapter != null) ? this.adapter.getPhase() : 0;
}

public boolean isRunning() {
return (this.adapter != null) ? this.adapter.isRunning() : false;
return (this.adapter != null) && this.adapter.isRunning();
}

public void start() {
Expand All @@ -215,4 +215,5 @@ public void stop(Runnable callback) {
this.adapter.stop(callback);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint implement
private volatile TransactionSynchronizationFactory transactionSynchronizationFactory;

public AbstractPollingEndpoint() {
this.setPhase(Integer.MAX_VALUE);
this.setPhase(Integer.MAX_VALUE / 2);
}

public void setTaskExecutor(Executor taskExecutor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public abstract class MessageProducerSupport extends AbstractEndpoint implements

private final MessagingTemplate messagingTemplate = new MessagingTemplate();

protected MessageProducerSupport() {
this.setPhase(Integer.MAX_VALUE / 2);
}

@Override
public void setOutputChannel(MessageChannel outputChannel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
* @see ApplicationEventMulticaster
* @see ExpressionMessageProducerSupport
*/
public class ApplicationEventListeningMessageProducer extends ExpressionMessageProducerSupport implements SmartApplicationListener {
public class ApplicationEventListeningMessageProducer extends ExpressionMessageProducerSupport
implements SmartApplicationListener {

private volatile Set<Class<? extends ApplicationEvent>> eventTypes;

Expand All @@ -55,8 +56,6 @@ public class ApplicationEventListeningMessageProducer extends ExpressionMessageP

private volatile long stoppedAt;

private volatile boolean phaseSet;

/**
* Set the list of event types (classes that extend ApplicationEvent) that
* this adapter should send to the message channel. By default, all event
Expand All @@ -81,12 +80,6 @@ public void setEventTypes(Class<? extends ApplicationEvent>... eventTypes) {
}
}

@Override
public void setPhase(int phase) {
super.setPhase(phase);
this.phaseSet = true;
}

@Override
public String getComponentType() {
return "event:inbound-channel-adapter";
Expand All @@ -96,12 +89,11 @@ public String getComponentType() {
protected void onInit() {
super.onInit();
this.applicationEventMulticaster = this.getBeanFactory()
.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME,
ApplicationEventMulticaster.class);
Assert.notNull(this.applicationEventMulticaster,
"To use ApplicationListeners the 'applicationEventMulticaster' bean must be supplied within ApplicationContext.");
if (!this.phaseSet) {
super.setPhase(Integer.MIN_VALUE + 1000);
}
"To use ApplicationListeners the 'applicationEventMulticaster' " +
"bean must be supplied within ApplicationContext.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,15 +30,15 @@
* @author Oleg Zhurakousky
* @author Gary Russell
*/
public class JmsMessageDrivenEndpoint extends AbstractEndpoint implements
DisposableBean, OrderlyShutdownCapable {
public class JmsMessageDrivenEndpoint extends AbstractEndpoint implements DisposableBean, OrderlyShutdownCapable {

private final AbstractMessageListenerContainer listenerContainer;

private final ChannelPublishingJmsMessageListener listener;


public JmsMessageDrivenEndpoint(AbstractMessageListenerContainer listenerContainer, ChannelPublishingJmsMessageListener listener) {
public JmsMessageDrivenEndpoint(AbstractMessageListenerContainer listenerContainer,
ChannelPublishingJmsMessageListener listener) {
Assert.notNull(listenerContainer, "listener container must not be null");
Assert.notNull(listener, "listener must not be null");
if (logger.isWarnEnabled() && listenerContainer.getMessageListener() != null) {
Expand All @@ -48,6 +48,7 @@ public JmsMessageDrivenEndpoint(AbstractMessageListenerContainer listenerContain
listenerContainer.setMessageListener(listener);
this.listener = listener;
this.listenerContainer = listenerContainer;
setPhase(Integer.MAX_VALUE / 2);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,35 @@ public class RedisQueueInboundChannelAdapterParserTests {

@Test
public void testInt3017DefaultConfig() {
assertSame(this.connectionFactory, TestUtils.getPropertyValue(this.defaultAdapter, "boundListOperations.ops.template.connectionFactory"));
assertSame(this.connectionFactory,
TestUtils.getPropertyValue(this.defaultAdapter, "boundListOperations.ops.template.connectionFactory"));
assertEquals("si.test.Int3017.Inbound1", TestUtils.getPropertyValue(this.defaultAdapter, "boundListOperations.key"));
assertFalse(TestUtils.getPropertyValue(this.defaultAdapter, "expectMessage", Boolean.class));
assertEquals(new Long(1000), TestUtils.getPropertyValue(this.defaultAdapter, "receiveTimeout", Long.class));
assertEquals(new Long(5000), TestUtils.getPropertyValue(this.defaultAdapter, "recoveryInterval", Long.class));
assertEquals(1000L, TestUtils.getPropertyValue(this.defaultAdapter, "receiveTimeout"));
assertEquals(5000L, TestUtils.getPropertyValue(this.defaultAdapter, "recoveryInterval"));
assertNull(TestUtils.getPropertyValue(this.defaultAdapter, "errorChannel"));
assertThat(TestUtils.getPropertyValue(this.defaultAdapter, "taskExecutor"), Matchers.instanceOf(ErrorHandlingTaskExecutor.class));
assertThat(TestUtils.getPropertyValue(this.defaultAdapter, "serializer"), Matchers.instanceOf(JdkSerializationRedisSerializer.class));
assertThat(TestUtils.getPropertyValue(this.defaultAdapter, "taskExecutor"),
Matchers.instanceOf(ErrorHandlingTaskExecutor.class));
assertThat(TestUtils.getPropertyValue(this.defaultAdapter, "serializer"),
Matchers.instanceOf(JdkSerializationRedisSerializer.class));
assertTrue(TestUtils.getPropertyValue(this.defaultAdapter, "autoStartup", Boolean.class));
assertEquals(Integer.MAX_VALUE / 2, TestUtils.getPropertyValue(this.defaultAdapter, "phase"));
assertSame(this.defaultAdapterChannel, TestUtils.getPropertyValue(this.defaultAdapter, "outputChannel"));
}

@Test
public void testInt3017CustomConfig() {
assertSame(this.customRedisConnectionFactory, TestUtils.getPropertyValue(this.customAdapter, "boundListOperations.ops.template.connectionFactory"));
assertSame(this.customRedisConnectionFactory,
TestUtils.getPropertyValue(this.customAdapter, "boundListOperations.ops.template.connectionFactory"));
assertEquals("si.test.Int3017.Inbound2", TestUtils.getPropertyValue(this.customAdapter, "boundListOperations.key"));
assertTrue(TestUtils.getPropertyValue(this.customAdapter, "expectMessage", Boolean.class));
assertEquals(new Long(2000), TestUtils.getPropertyValue(this.customAdapter, "receiveTimeout", Long.class));
assertEquals(new Long(3000), TestUtils.getPropertyValue(this.customAdapter, "recoveryInterval", Long.class));
assertEquals(2000L, TestUtils.getPropertyValue(this.customAdapter, "receiveTimeout"));
assertEquals(3000L, TestUtils.getPropertyValue(this.customAdapter, "recoveryInterval"));
assertSame(this.errorChannel, TestUtils.getPropertyValue(this.customAdapter, "errorChannel"));
assertSame(this.taskExecutor, TestUtils.getPropertyValue(this.customAdapter, "taskExecutor"));
assertSame(this.serializer, TestUtils.getPropertyValue(this.customAdapter, "serializer"));
assertFalse(TestUtils.getPropertyValue(this.customAdapter, "autoStartup", Boolean.class));
assertEquals(new Integer(100), TestUtils.getPropertyValue(this.customAdapter, "phase", Integer.class));
assertEquals(100, TestUtils.getPropertyValue(this.customAdapter, "phase"));
assertSame(this.sendChannel, TestUtils.getPropertyValue(this.customAdapter, "outputChannel"));
}

Expand Down

0 comments on commit 7abefb0

Please sign in to comment.