Skip to content

Commit

Permalink
INT-3661: Fix the eager BF access from BPPs (P I)
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-3661

Previously there were a lot of noise from the `PostProcessorRegistrationDelegate$BeanPostProcessorChecker` for early access for beans.
That may produce some side-effects when some of `BeanFactoryPostProcessor`s won't adjust those beans.

The issue is based on two facts:
1. Loading beans from `BPP`, e.g. `IntegrationEvaluationContextAwareBeanPostProcessor` (or `ChannelSecurityInterceptorBeanPostProcessor` - https://jira.spring.io/browse/INT-3663)
2. Loading beans from `setBeanFactory()/setApplicationContext()` container methods

* Move all code from `setBeanFactory()` with access to the `BeanFactory` (e.g. `this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);`)
to some other lazy-load methods like `getMessageBuilderFactory()`
* Fix parser tests to remove `messageBuilderFactory` tests since there is no activity for target components to lazy-load them
* Polish some test according the new lazy-load logic
* Rework `IntegrationEvaluationContextAwareBeanPostProcessor` to the `SmartInitializingSingleton` and make it `Ordered`
* Populate `beanFactory` for the internal instance of `connectionFactory` in the `TcpSyslogReceivingChannelAdapter`
* Populate `beanFactory` for the internal `UnicastReceivingChannelAdapter` in the `UdpSyslogReceivingChannelAdapter`
* Add `log.info` that `UdpSyslogReceivingChannelAdapter` overrides `outputChannel` for the provided `UnicastReceivingChannelAdapter`
* Change the internal `MessageChannel` in the `UdpSyslogReceivingChannelAdapter` to the `FixedSubscriberChannel` for better performance

* Fix `AbstractExpressionEvaluator`
* Add JavaDocs for the `IntegrationEvaluationContextAware`

Fix `MongoDbMessageStoreClaimCheckIntegrationTests`

Addressing PR comments
  • Loading branch information
artembilan authored and garyrussell committed Mar 2, 2015
1 parent 134c7c8 commit 2bde14b
Show file tree
Hide file tree
Showing 45 changed files with 546 additions and 201 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,7 +57,6 @@ public void interceptor() {
TestUtils.getPropertyValue(channel, "dispatcher"), "maxSubscribers", Integer.class).intValue());
channel = context.getBean("pubSub", MessageChannel.class);
Object mbf = context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
assertSame(mbf, TestUtils.getPropertyValue(channel, "dispatcher.messageBuilderFactory"));
assertSame(mbf, TestUtils.getPropertyValue(channel, "container.messageListener.messageBuilderFactory"));
assertTrue(TestUtils.getPropertyValue(channel, "container.missingQueuesFatal", Boolean.class));
assertFalse(TestUtils.getPropertyValue(channel, "container.transactional", Boolean.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* @author Mark Fisher
* @author Dave Syer
* @author Gary Russell
* @author Artem Bilan
* @since 2.0
*/
public abstract class AbstractAggregatingMessageGroupProcessor implements MessageGroupProcessor,
Expand All @@ -52,9 +53,23 @@ public abstract class AbstractAggregatingMessageGroupProcessor implements Messag

private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();

private volatile boolean messageBuilderFactorySet;

private BeanFactory beanFactory;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(beanFactory);
this.beanFactory = beanFactory;
}

protected MessageBuilderFactory getMessageBuilderFactory() {
if (!this.messageBuilderFactorySet) {
if (this.beanFactory != null) {
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
}
this.messageBuilderFactorySet = true;
}
return this.messageBuilderFactory;
}

@Override
Expand All @@ -65,10 +80,10 @@ public final Object processMessageGroup(MessageGroup group) {
Object payload = this.aggregatePayloads(group, headers);
AbstractIntegrationMessageBuilder<?> builder;
if (payload instanceof Message<?>) {
builder = this.messageBuilderFactory.fromMessage((Message<?>) payload).copyHeadersIfAbsent(headers);
builder = getMessageBuilderFactory().fromMessage((Message<?>) payload).copyHeadersIfAbsent(headers);
}
else {
builder = this.messageBuilderFactory.withPayload(payload).copyHeadersIfAbsent(headers);
builder = getMessageBuilderFactory().withPayload(payload).copyHeadersIfAbsent(headers);
}

return builder.popSequenceDetails().build();
Expand All @@ -89,7 +104,8 @@ protected Map<String, Object> aggregateHeaders(MessageGroup group) {
for (Entry<String, Object> entry : message.getHeaders().entrySet()) {
String key = entry.getKey();
if (MessageHeaders.ID.equals(key) || MessageHeaders.TIMESTAMP.equals(key)
|| IntegrationMessageHeaderAccessor.SEQUENCE_SIZE.equals(key) || IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER.equals(key)) {
|| IntegrationMessageHeaderAccessor.SEQUENCE_SIZE.equals(key)
|| IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER.equals(key)) {
continue;
}
Object value = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,14 +22,16 @@
import java.util.Collection;
import java.util.Map;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.messaging.Message;
import org.springframework.integration.util.AbstractExpressionEvaluator;
import org.springframework.integration.util.MessagingMethodInvokerHelper;

/**
* A MessageListProcessor implementation that invokes a method on a target POJO.
*
*
* @author Dave Syer
* @author Artem Bilan
* @since 2.0
*/
public class MethodInvokingMessageListProcessor<T> extends AbstractExpressionEvaluator {
Expand Down Expand Up @@ -57,6 +59,12 @@ public MethodInvokingMessageListProcessor(Object targetObject, Class<? extends A
delegate = new MessagingMethodInvokerHelper<T>(targetObject, annotationType, Object.class, true);
}

@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
this.delegate.setBeanFactory(beanFactory);
}

public String toString() {
return delegate.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,6 @@
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.ParseException;
import org.springframework.expression.spel.SpelParserConfiguration;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.core.MessagingTemplate;
Expand Down Expand Up @@ -63,9 +62,9 @@ public class MessagePublishingInterceptor implements MethodInterceptor, BeanFact

private final MessagingTemplate messagingTemplate = new MessagingTemplate();

private volatile PublisherMetadataSource metadataSource;
private final ExpressionParser parser = new SpelExpressionParser();

private final ExpressionParser parser = new SpelExpressionParser(new SpelParserConfiguration(true, true));
private volatile PublisherMetadataSource metadataSource;

private volatile DestinationResolver<MessageChannel> channelResolver;

Expand All @@ -75,6 +74,8 @@ public class MessagePublishingInterceptor implements MethodInterceptor, BeanFact

private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();

private volatile boolean messageBuilderFactorySet;

private volatile String defaultChannelName;

public MessagePublishingInterceptor(PublisherMetadataSource metadataSource) {
Expand Down Expand Up @@ -114,7 +115,16 @@ public void setChannelResolver(DestinationResolver<MessageChannel> channelResolv
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
this.messagingTemplate.setBeanFactory(beanFactory);
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(beanFactory);
}

protected MessageBuilderFactory getMessageBuilderFactory() {
if (!this.messageBuilderFactorySet) {
if (this.beanFactory != null) {
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
}
this.messageBuilderFactorySet = true;
}
return this.messageBuilderFactory;
}

@Override
Expand Down Expand Up @@ -164,8 +174,8 @@ private void publishMessage(Method method, StandardEvaluationContext context) th
Object result = expression.getValue(context);
if (result != null) {
AbstractIntegrationMessageBuilder<?> builder = (result instanceof Message<?>)
? this.messageBuilderFactory.fromMessage((Message<?>) result)
: this.messageBuilderFactory.withPayload(result);
? getMessageBuilderFactory().fromMessage((Message<?>) result)
: getMessageBuilderFactory().withPayload(result);
Map<String, Object> headers = this.evaluateHeaders(method, context);
if (headers != null) {
builder.copyHeaders(headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,9 @@ public String getComponentType() {
}

@Override
public final void setBeanFactory(BeanFactory beanFactory) {
public void setBeanFactory(BeanFactory beanFactory) {
Assert.notNull(beanFactory, "'beanFactory' must not be null");
this.beanFactory = beanFactory;
this.integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
}

@Override
Expand All @@ -143,9 +142,15 @@ public void setChannelResolver(DestinationResolver<MessageChannel> channelResolv

@Override
public final void afterPropertiesSet() {
this.integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
try {
if (this.messageBuilderFactory == null) {
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
if (this.beanFactory != null) {
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
}
else {
this.messageBuilderFactory = new DefaultMessageBuilderFactory();
}
}
this.onInit();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.core;

import java.util.Properties;
Expand All @@ -22,6 +23,7 @@
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.core.GenericMessagingTemplate;

Expand All @@ -35,6 +37,10 @@
*/
public class MessagingTemplate extends GenericMessagingTemplate {

private BeanFactory beanFactory;

private volatile boolean throwExceptionOnLateReplySet;

/**
* Create a MessagingTemplate with no default channel. Note, that one
* may be provided by invoking {@link #setDefaultChannel(MessageChannel)}.
Expand All @@ -55,10 +61,14 @@ public MessagingTemplate(MessageChannel defaultChannel) {
*/
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
super.setDestinationResolver(new BeanFactoryChannelResolver(beanFactory));
Properties integrationProperties = IntegrationContextUtils.getIntegrationProperties(beanFactory);
Boolean throwExceptionOnLateReply = Boolean.valueOf(integrationProperties.getProperty(IntegrationProperties.THROW_EXCEPTION_ON_LATE_REPLY));
this.setThrowExceptionOnLateReply(throwExceptionOnLateReply);
}

@Override
public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) {
super.setThrowExceptionOnLateReply(throwExceptionOnLateReply);
this.throwExceptionOnLateReplySet = true;
}

/**
Expand All @@ -70,4 +80,21 @@ public void setDefaultChannel(MessageChannel channel) {
super.setDefaultDestination(channel);
}

@Override
public Message<?> sendAndReceive(MessageChannel destination, Message<?> requestMessage) {
if (!this.throwExceptionOnLateReplySet) {
synchronized (this) {
if (!this.throwExceptionOnLateReplySet) {
Properties integrationProperties =
IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
Boolean throwExceptionOnLateReply = Boolean.valueOf(integrationProperties
.getProperty(IntegrationProperties.THROW_EXCEPTION_ON_LATE_REPLY));
super.setThrowExceptionOnLateReply(throwExceptionOnLateReply);
this.throwExceptionOnLateReplySet = true;
}
}
}
return super.sendAndReceive(destination, requestMessage);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,7 @@
* @author Iwein Fuld
* @author Gary Russell
* @author Oleg Zhurakousky
* @author Artem Bilan
*/
public class BroadcastingDispatcher extends AbstractDispatcher implements BeanFactoryAware {

Expand All @@ -59,6 +60,10 @@ public class BroadcastingDispatcher extends AbstractDispatcher implements BeanFa

private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();

private volatile boolean messageBuilderFactorySet;

private BeanFactory beanFactory;


public BroadcastingDispatcher() {
this(null, false);
Expand Down Expand Up @@ -114,7 +119,17 @@ public void setMinSubscribers(int minSubscribers) {

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(beanFactory);
this.beanFactory = beanFactory;
}

protected MessageBuilderFactory getMessageBuilderFactory() {
if (!this.messageBuilderFactorySet) {
if (this.beanFactory != null) {
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
}
this.messageBuilderFactorySet = true;
}
return this.messageBuilderFactory;
}

@Override
Expand All @@ -127,7 +142,7 @@ public boolean dispatch(Message<?> message) {
}
int sequenceSize = handlers.size();
for (final MessageHandler handler : handlers) {
final Message<?> messageToSend = (!this.applySequence) ? message : this.messageBuilderFactory.fromMessage(message)
final Message<?> messageToSend = (!this.applySequence) ? message : getMessageBuilderFactory().fromMessage(message)
.pushSequenceDetails(message.getHeaders().getId(), sequenceNumber++, sequenceSize).build();
if (this.executor != null) {
this.executor.execute(new Runnable() {
Expand Down Expand Up @@ -174,5 +189,4 @@ else if (this.logger.isWarnEnabled()) {
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,21 @@
import org.springframework.expression.EvaluationContext;

/**
* Interface to be implemented by beans that wish to be aware of their
* owning integration {@link EvaluationContext}, which is the result of
* {@link org.springframework.integration.config.IntegrationEvaluationContextFactoryBean}
* <p>
* The {@link #setIntegrationEvaluationContext} is invoked from
* the {@link IntegrationEvaluationContextAwareBeanPostProcessor#afterSingletonsInstantiated()},
* not during standard {@code postProcessBefore(After)Initialization} to avoid any
* {@code BeanFactory} early access during integration {@link EvaluationContext} retrieval.
* Therefore, if it is necessary to use {@link EvaluationContext} in the {@code afterPropertiesSet()},
* the {@code IntegrationContextUtils.getEvaluationContext(this.beanFactory)} should be used instead
* of this interface implementation.
*
* @author Artem Bilan
* @since 3.0
* @see IntegrationEvaluationContextAwareBeanPostProcessor
*/
public interface IntegrationEvaluationContextAware {

Expand Down
Loading

0 comments on commit 2bde14b

Please sign in to comment.