Skip to content

Commit

Permalink
INT-3309 Pluggable MessageBuilder
Browse files Browse the repository at this point in the history
This is still a work in process.

There are a bunch of TODOs in classes
that are not managed by Spring and so
need to have the MessageBuilderFactory
injected.

But I am looking for feedback on the
approach.

INT-3309 Resolve TODOs

Provide access to the MessageBuilderFactory in all classes.

INT-3309 Polishing + Tests

* Fallback to 'fromMessage()' if mutating and inbound message is not MutableMessage
* Add 'alwaysMutate' boolean to MutableMessageBuilderFactory - coerces 'fromMessage' calls to 'mutateMessage'
* Add tests

INT-3309 Polishing; PR Comments

Also add tests to parent/child contexts where the parent has the
default message builder and the child has a mutable message builder.

INT-3309 More Polish; PR Comments

Also fix removeHeader in MMB.
  • Loading branch information
garyrussell authored and Artem Bilan committed Mar 6, 2014
1 parent 4c775bc commit ff845b5
Show file tree
Hide file tree
Showing 115 changed files with 1,760 additions and 423 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.dispatcher.AbstractDispatcher;
import org.springframework.integration.dispatcher.MessageDispatcher;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
Expand Down Expand Up @@ -129,7 +129,8 @@ public void onInit() throws Exception {
? ((RabbitTemplate) this.getAmqpTemplate()).getMessageConverter()
: new SimpleMessageConverter();
MessageListener listener = new DispatchingMessageListener(converter,
this.dispatcher, this, this.isPubSub);
this.dispatcher, this, this.isPubSub,
this.getMessageBuilderFactory());
this.container.setMessageListener(listener);
if (!this.container.isActive()) {
this.container.afterPropertiesSet();
Expand All @@ -153,14 +154,18 @@ private static class DispatchingMessageListener implements MessageListener {

private final boolean isPubSub;

private final MessageBuilderFactory messageBuilderFactory;

private DispatchingMessageListener(MessageConverter converter,
MessageDispatcher dispatcher, AbstractSubscribableAmqpChannel channel, boolean isPubSub) {
MessageDispatcher dispatcher, AbstractSubscribableAmqpChannel channel, boolean isPubSub,
MessageBuilderFactory messageBuilderFactory) {
Assert.notNull(converter, "MessageConverter must not be null");
Assert.notNull(dispatcher, "MessageDispatcher must not be null");
this.converter = converter;
this.dispatcher = dispatcher;
this.channel = channel;
this.isPubSub = isPubSub;
this.messageBuilderFactory = messageBuilderFactory;
}


Expand All @@ -171,7 +176,7 @@ public void onMessage(org.springframework.amqp.core.Message message) {
Object converted = this.converter.fromMessage(message);
if (converted != null) {
messageToSend = (converted instanceof Message<?>) ? (Message<?>) converted
: MessageBuilder.withPayload(converted).build();
: this.messageBuilderFactory.withPayload(converted).build();
this.dispatcher.dispatch(messageToSend);
}
else if (this.logger.isWarnEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -106,7 +105,7 @@ public Message<?> receive() {
replyMessage = (Message<?>) object;
}
else {
replyMessage = MessageBuilder.withPayload(object).build();
replyMessage = this.getMessageBuilderFactory().withPayload(object).build();
}
return this.getInterceptors().postReceive(replyMessage, this) ;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -73,7 +72,7 @@ protected void onInit() {
public void onMessage(Message message) {
Object payload = messageConverter.fromMessage(message);
Map<String, ?> headers = headerMapper.toHeadersFromRequest(message.getMessageProperties());
sendMessage(MessageBuilder.withPayload(payload).copyHeaders(headers).build());
sendMessage(AmqpInboundChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).copyHeaders(headers).build());
}
});
this.messageListenerContainer.afterPropertiesSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

Expand All @@ -40,7 +39,7 @@
* Spring Integration Messages, and sends the results to a Message Channel.
* If a reply Message is received, it will be converted and sent back to
* the AMQP 'replyTo'.
*
*
* @author Mark Fisher
* @since 2.1
*/
Expand Down Expand Up @@ -83,7 +82,7 @@ public void onMessage(Message message) {
Object payload = amqpMessageConverter.fromMessage(message);
Map<String, ?> headers = headerMapper.toHeadersFromRequest(message.getMessageProperties());
org.springframework.messaging.Message<?> request =
MessageBuilder.withPayload(payload).copyHeaders(headers).build();
AmqpInboundGateway.this.getMessageBuilderFactory().withPayload(payload).copyHeaders(headers).build();
final org.springframework.messaging.Message<?> reply = sendAndReceiveMessage(request);
if (reply != null) {
// TODO: fallback to a reply address property of this gateway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.SpelParserConfiguration;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.integration.amqp.AmqpHeaders;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -240,9 +240,9 @@ private Message<?> sendAndReceive(String exchangeName, String routingKey, Messag
return null;
}
Object replyObject = converter.fromMessage(amqpReplyMessage);
MessageBuilder<?> builder = (replyObject instanceof Message)
? MessageBuilder.fromMessage((Message<?>) replyObject)
: MessageBuilder.withPayload(replyObject);
AbstractIntegrationMessageBuilder<?> builder = (replyObject instanceof Message)
? this.getMessageBuilderFactory().fromMessage((Message<?>) replyObject)
: this.getMessageBuilderFactory().withPayload(replyObject);
Map<String, ?> headers = this.headerMapper.toHeadersFromReply(amqpReplyMessage.getMessageProperties());
builder.copyHeadersIfAbsent(headers);
return builder.build();
Expand All @@ -253,7 +253,7 @@ public void confirm(CorrelationData correlationData, boolean ack) {
if (correlationData instanceof CorrelationDataWrapper) {
userCorrelationData = ((CorrelationDataWrapper) correlationData).getUserData();
}
Message<Object> confirmMessage = MessageBuilder.withPayload(userCorrelationData)
Message<Object> confirmMessage = this.getMessageBuilderFactory().withPayload(userCorrelationData)
.setHeader(AmqpHeaders.PUBLISH_CONFIRM, ack)
.build();
if (ack && this.confirmAckChannel != null) {
Expand Down Expand Up @@ -291,9 +291,9 @@ public void returnedMessage(org.springframework.amqp.core.Message message, int r
// safe to cast; we asserted we have a RabbitTemplate in doInit()
MessageConverter converter = ((RabbitTemplate) this.amqpTemplate).getMessageConverter();
Object returnedObject = converter.fromMessage(message);
MessageBuilder<?> builder = (returnedObject instanceof Message)
? MessageBuilder.fromMessage((Message<?>) returnedObject)
: MessageBuilder.withPayload(returnedObject);
AbstractIntegrationMessageBuilder<?> builder = (returnedObject instanceof Message)
? this.getMessageBuilderFactory().fromMessage((Message<?>) returnedObject)
: this.getMessageBuilderFactory().withPayload(returnedObject);
Map<String, ?> headers = this.headerMapper.toHeadersFromReply(message.getMessageProperties());
builder.copyHeadersIfAbsent(headers)
.setHeader(AmqpHeaders.RETURN_REPLY_CODE, replyCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
Expand All @@ -41,18 +43,25 @@ public abstract class AbstractAggregatingMessageGroupProcessor implements Messag

private final Log logger = LogFactory.getLog(this.getClass());

private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();

public void setMessageBuilderFactory(MessageBuilderFactory messageBuilderFactory) {
Assert.notNull(messageBuilderFactory, "'messageBuilderFactory' cannot be null");
this.messageBuilderFactory = messageBuilderFactory;
}

@Override
public final Object processMessageGroup(MessageGroup group) {
Assert.notNull(group, "MessageGroup must not be null");

Map<String, Object> headers = this.aggregateHeaders(group);
Object payload = this.aggregatePayloads(group, headers);
MessageBuilder<?> builder;
AbstractIntegrationMessageBuilder<?> builder;
if (payload instanceof Message<?>) {
builder = MessageBuilder.fromMessage((Message<?>) payload).copyHeadersIfAbsent(headers);
builder = this.messageBuilderFactory.fromMessage((Message<?>) payload).copyHeadersIfAbsent(headers);
}
else {
builder = MessageBuilder.withPayload(payload).copyHeadersIfAbsent(headers);
builder = this.messageBuilderFactory.withPayload(payload).copyHeadersIfAbsent(headers);
}

return builder.popSequenceDetails().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;

import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
Expand All @@ -35,12 +36,15 @@
import org.springframework.expression.spel.SpelParserConfiguration;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.core.DestinationResolver;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

Expand Down Expand Up @@ -68,6 +72,7 @@ public class MessagePublishingInterceptor implements MethodInterceptor, BeanFact

private final ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();

private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();

public MessagePublishingInterceptor(PublisherMetadataSource metadataSource) {
Assert.notNull(metadataSource, "metadataSource must not be null");
Expand All @@ -92,6 +97,7 @@ public void setChannelResolver(DestinationResolver<MessageChannel> channelResolv
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
this.messagingTemplate.setBeanFactory(beanFactory);
this.messageBuilderFactory = IntegrationContextUtils.getMessageBuilderFactory(beanFactory);
}

public final Object invoke(final MethodInvocation invocation) throws Throwable {
Expand Down Expand Up @@ -139,9 +145,9 @@ private void publishMessage(Method method, StandardEvaluationContext context) th
Expression expression = this.parser.parseExpression(payloadExpressionString);
Object result = expression.getValue(context);
if (result != null) {
MessageBuilder<?> builder = (result instanceof Message<?>)
? MessageBuilder.fromMessage((Message<?>) result)
: MessageBuilder.withPayload(result);
AbstractIntegrationMessageBuilder<?> builder = (result instanceof Message<?>)
? this.messageBuilderFactory.fromMessage((Message<?>) result)
: this.messageBuilderFactory.withPayload(result);
Map<String, Object> headers = this.evaluateHeaders(method, context);
if (headers != null) {
builder.copyHeaders(headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.history.MessageHistory;
import org.springframework.integration.history.TrackableComponent;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.converter.DefaultDatatypeChannelMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
Expand Down Expand Up @@ -243,7 +242,7 @@ public final boolean send(Message<?> message, long timeout) {
Assert.notNull(message, "message must not be null");
Assert.notNull(message.getPayload(), "message payload must not be null");
if (this.shouldTrack) {
message = MessageHistory.write(message, this);
message = MessageHistory.write(message, this, this.getMessageBuilderFactory());
}
try {
if (this.datatypes.length > 0) {
Expand Down Expand Up @@ -282,7 +281,7 @@ private Message<?> convertPayloadIfNecessary(Message<?> message) {
return (Message<?>) converted;
}
else {
return MessageBuilder.withPayload(converted).copyHeaders(message.getHeaders()).build();
return this.getMessageBuilderFactory().withPayload(converted).copyHeaders(message.getHeaders()).build();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.expression.IntegrationEvaluationContextAwareBeanPostProcessor;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.converter.DefaultDatatypeChannelMessageConverter;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -91,6 +92,7 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B
this.registerMessagingAnnotationPostProcessors(importingClassMetadata, registry);
}
this.registerIntegrationConfigurationBeanFactoryPostProcessor(registry);
this.registerMessageBuilderFactory(registry);
}

/**
Expand Down Expand Up @@ -353,4 +355,24 @@ private void registerDefaultDatatypeChannelMessageConverter(BeanDefinitionRegist

}

private void registerMessageBuilderFactory(BeanDefinitionRegistry registry) {
boolean alreadyRegistered = false;
if (registry instanceof ListableBeanFactory) {
alreadyRegistered = ((ListableBeanFactory) registry)
.containsBean(IntegrationContextUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
}
else {
alreadyRegistered = registry
.isBeanNameInUse(IntegrationContextUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
}
if (!alreadyRegistered) {
BeanDefinitionBuilder mbfBuilder = BeanDefinitionBuilder
.genericBeanDefinition(DefaultMessageBuilderFactory.class);
registry.registerBeanDefinition(
IntegrationContextUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME,
mbfBuilder.getBeanDefinition());
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@

import java.util.Properties;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.core.convert.ConversionService;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.config.IntegrationConfigUtils;
import org.springframework.integration.metadata.MetadataStore;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
Expand All @@ -37,6 +42,8 @@
*/
public abstract class IntegrationContextUtils {

private static final Log logger = LogFactory.getLog(IntegrationContextUtils.class);

public static final String TASK_SCHEDULER_BEAN_NAME = "taskScheduler";

public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel";
Expand Down Expand Up @@ -72,7 +79,7 @@ public abstract class IntegrationContextUtils {
public static final String INTEGRATION_DATATYPE_CHANNEL_MESSAGE_CONVERTER_BEAN_NAME = "datatypeChannelMessageConverter";

public static final String INTEGRATION_FIXED_SUBSCRIBER_CHANNEL_BPP_BEAN_NAME = "fixedSubscriberChannelBeanFactoryPostProcessor";

public static final String INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME = "messageBuilderFactory";
/**
* @param beanFactory BeanFactory for lookup, must not be null.
* @return The {@link MetadataStore} bean whose name is "metadataStore".
Expand Down Expand Up @@ -155,4 +162,38 @@ public static Properties getIntegrationProperties(BeanFactory beanFactory) {
return properties;
}

/**
* Returns the context-wide `messageBuilderFactory` bean from the beanFactory,
* or a {@link DefaultMessageBuilderFactory} if not found or the beanFactory is null.
* @param beanFactory The bean factory.
* @return The message builder factory.
*/
public static MessageBuilderFactory getMessageBuilderFactory(BeanFactory beanFactory) {
MessageBuilderFactory messageBuilderFactory = null;
if (beanFactory != null) {
try {
messageBuilderFactory = beanFactory.getBean(
IntegrationContextUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME, MessageBuilderFactory.class);
}
catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("No MessageBuilderFactory with name '"
+ IntegrationContextUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME
+ "' found: " + e.getMessage()
+ ", using default.");
}
}
}
else {
if (logger.isWarnEnabled()) {
logger.warn("No 'beanFactory' supplied; cannot find MessageBuilderFactory"
+ ", using default.");
}
}
if (messageBuilderFactory == null) {
messageBuilderFactory = new DefaultMessageBuilderFactory();
}
return messageBuilderFactory;
}

}
Loading

0 comments on commit ff845b5

Please sign in to comment.