Skip to content

Commit

Permalink
INT-1897: Refactoring for ChannelInterceptors
Browse files Browse the repository at this point in the history
JIRA: https://jira.springsource.org/browse/INT-1897

INT-1897: Introduce `ChannelInterceptorAware`

Upgrade to SF 4.0.1

JIRA: https://jira.springsource.org/browse/INT-3255
  • Loading branch information
Artem Bilan authored and garyrussell committed Jan 29, 2014
1 parent 062e7bc commit 2b888b7
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 118 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ subprojects { subproject ->
h2Version = '1.3.172'
activeMqVersion = '5.8.0'

springVersion = project.hasProperty('springVersion') ? project.springVersion : '4.0.1.BUILD-SNAPSHOT'
springVersion = project.hasProperty('springVersion') ? project.springVersion : '4.0.1.RELEASE'

springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '1.3.0.BUILD-SNAPSHOT'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@
* @author Mark Fisher
* @author Oleg Zhurakousky
* @author Gary Russell
* @author Artem Bilan
*/
public abstract class AbstractMessageChannel extends IntegrationObjectSupport implements MessageChannel, TrackableComponent {
public abstract class AbstractMessageChannel extends IntegrationObjectSupport
implements MessageChannel, TrackableComponent, ChannelInterceptorAware {

protected final Log logger = LogFactory.getLog(this.getClass());

Expand Down Expand Up @@ -94,6 +96,7 @@ public void setDatatypes(Class<?>... datatypes) {
*
* @param interceptors The list of interceptors.
*/
@Override
public void setInterceptors(List<ChannelInterceptor> interceptors) {
Collections.sort(interceptors, new OrderComparator());
this.interceptors.set(interceptors);
Expand All @@ -104,10 +107,22 @@ public void setInterceptors(List<ChannelInterceptor> interceptors) {
*
* @param interceptor The interceptor.
*/
@Override
public void addInterceptor(ChannelInterceptor interceptor) {
this.interceptors.add(interceptor);
}

/**
* Add a channel interceptor to the specified index of the list.
*
* @param index The index to add interceptor.
* @param interceptor The interceptor.
*/
@Override
public void addInterceptor(int index, ChannelInterceptor interceptor) {
this.interceptors.add(index, interceptor);
}

/**
* Specify the {@link ConversionService} to use when trying to convert to
* one of this channel's supported datatypes for a Message whose payload
Expand All @@ -124,6 +139,14 @@ public void setConversionService(ConversionService conversionService) {
super.setConversionService(conversionService);
}

/**
* Return a read-only list of the configured interceptors.
*/
@Override
public List<ChannelInterceptor> getChannelInterceptors() {
return this.interceptors.getInterceptors();
}

/**
* Exposes the interceptor list for subclasses.
*
Expand Down Expand Up @@ -261,6 +284,10 @@ public boolean add(ChannelInterceptor interceptor) {
return this.interceptors.add(interceptor);
}

public void add(int index, ChannelInterceptor interceptor) {
this.interceptors.add(index, interceptor);
}

public Message<?> preSend(Message<?> message, MessageChannel channel) {
if (logger.isDebugEnabled()) {
logger.debug("preSend on channel '" + channel + "', message: " + message);
Expand Down Expand Up @@ -310,5 +337,9 @@ else if (logger.isTraceEnabled()) {
}
return message;
}

public List<ChannelInterceptor> getInterceptors() {
return Collections.unmodifiableList(this.interceptors);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.channel;

import java.util.List;

import org.springframework.messaging.support.ChannelInterceptor;

/**
* A marker interface providing the ability to configure {@link ChannelInterceptor}s
* on {@link org.springframework.messaging.MessageChannel} implementations.
* <p>
* Typically useful when the target {@link org.springframework.messaging.MessageChannel}
* is an AOP Proxy.
* *
* @author Artem Bilan
* @since 4.0
*/
public interface ChannelInterceptorAware {

void setInterceptors(List<ChannelInterceptor> interceptors);

void addInterceptor(ChannelInterceptor interceptor);

void addInterceptor(int index, ChannelInterceptor interceptor);

List<ChannelInterceptor> getChannelInterceptors();

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,16 +25,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.NotReadablePropertyException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.OrderComparator;
import org.springframework.integration.channel.ChannelInterceptorAware;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.StringUtils;

Expand All @@ -43,6 +39,7 @@
*
* @author Oleg Zhurakousky
* @author Mark Fisher
* @author Artem Bilan
* @since 2.0
*/
final class GlobalChannelInterceptorBeanPostProcessor implements BeanPostProcessor, InitializingBean {
Expand Down Expand Up @@ -83,79 +80,46 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof MessageChannel) {
if (bean instanceof ChannelInterceptorAware && bean instanceof MessageChannel) {
if (logger.isDebugEnabled()) {
logger.debug("Applying global interceptors on channel '" + beanName + "'");
}
this.addMatchingInterceptors((MessageChannel) bean, beanName);
this.addMatchingInterceptors((ChannelInterceptorAware) bean, beanName);
}
return bean;
}

/**
* Adds any interceptor whose pattern matches against the channel's name.
*/
private void addMatchingInterceptors(MessageChannel channel, String beanName) {
List<ChannelInterceptor> interceptors = this.getExistingInterceptors(channel);
if (interceptors != null) {
List<GlobalChannelInterceptorWrapper> tempInterceptors = new ArrayList<GlobalChannelInterceptorWrapper>();
for (GlobalChannelInterceptorWrapper globalChannelInterceptorWrapper : this.positiveOrderInterceptors) {
String[] patterns = globalChannelInterceptorWrapper.getPatterns();
patterns = StringUtils.trimArrayElements(patterns);
if (PatternMatchUtils.simpleMatch(patterns, beanName)) {
tempInterceptors.add(globalChannelInterceptorWrapper);
}
}
Collections.sort(tempInterceptors, this.comparator);
for (GlobalChannelInterceptorWrapper next : tempInterceptors) {
interceptors.add(next.getChannelInterceptor());
}
tempInterceptors = new ArrayList<GlobalChannelInterceptorWrapper>();
for (GlobalChannelInterceptorWrapper globalChannelInterceptorWrapper : this.negativeOrderInterceptors) {
String[] patterns = globalChannelInterceptorWrapper.getPatterns();
patterns = StringUtils.trimArrayElements(patterns);
if (PatternMatchUtils.simpleMatch(patterns, beanName)) {
tempInterceptors.add(globalChannelInterceptorWrapper);
}
}
Collections.sort(tempInterceptors, comparator);
if (!tempInterceptors.isEmpty()) {
for (int i = tempInterceptors.size() - 1; i >= 0; i--) {
interceptors.add(0, tempInterceptors.get(i).getChannelInterceptor());
}
private void addMatchingInterceptors(ChannelInterceptorAware channel, String beanName) {
List<GlobalChannelInterceptorWrapper> tempInterceptors = new ArrayList<GlobalChannelInterceptorWrapper>();
for (GlobalChannelInterceptorWrapper globalChannelInterceptorWrapper : this.positiveOrderInterceptors) {
String[] patterns = globalChannelInterceptorWrapper.getPatterns();
patterns = StringUtils.trimArrayElements(patterns);
if (PatternMatchUtils.simpleMatch(patterns, beanName)) {
tempInterceptors.add(globalChannelInterceptorWrapper);
}
}
else if (logger.isDebugEnabled()) {
logger.debug("Global Channel interceptors will not be applied to Channel: " + beanName);
Collections.sort(tempInterceptors, this.comparator);
for (GlobalChannelInterceptorWrapper next : tempInterceptors) {
channel.addInterceptor(next.getChannelInterceptor());
}
}

@SuppressWarnings("unchecked")
private List<ChannelInterceptor> getExistingInterceptors(MessageChannel channel) {
try {
MessageChannel targetChannel = channel;
if (AopUtils.isAopProxy(channel)) {
Object target = ((Advised) channel).getTargetSource().getTarget();
if (target instanceof MessageChannel) {
targetChannel = (MessageChannel) target;
}
}
DirectFieldAccessor channelAccessor = new DirectFieldAccessor(targetChannel);
Object interceptorListWrapper = channelAccessor.getPropertyValue("interceptors");
if (interceptorListWrapper != null) {
return (List<ChannelInterceptor>) new DirectFieldAccessor(interceptorListWrapper).getPropertyValue("interceptors");
tempInterceptors.clear();
for (GlobalChannelInterceptorWrapper globalChannelInterceptorWrapper : this.negativeOrderInterceptors) {
String[] patterns = globalChannelInterceptorWrapper.getPatterns();
patterns = StringUtils.trimArrayElements(patterns);
if (PatternMatchUtils.simpleMatch(patterns, beanName)) {
tempInterceptors.add(globalChannelInterceptorWrapper);
}
}
catch (NotReadablePropertyException e) {
// Channel doesn't support interceptors - null return logged by caller
}
catch (Exception e) {
// interceptors not supported, will return null
if (logger.isDebugEnabled() && channel != null) {
logger.debug("interceptors not supported by channel '" + channel + "'", e);
Collections.sort(tempInterceptors, comparator);
if (!tempInterceptors.isEmpty()) {
for (int i = tempInterceptors.size() - 1; i >= 0; i--) {
channel.addInterceptor(0, tempInterceptors.get(i).getChannelInterceptor());
}
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,29 +20,33 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.hamcrest.Matchers;
import org.junit.Test;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.ChannelInterceptorAware;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.StringUtils;

/**
* @author Mark Fisher
* @author Oleg Zhurakousky
* @author Artem Bilan
*/
public class ChannelInterceptorTests {

Expand Down Expand Up @@ -162,14 +166,11 @@ public Message<?> postReceive(Message<?> message, MessageChannel channel) {
@Test
public void testInterceptorBeanWithPnamespace(){
ApplicationContext ac = new ClassPathXmlApplicationContext("ChannelInterceptorTests-context.xml", ChannelInterceptorTests.class);
AbstractMessageChannel channel = ac.getBean("input", AbstractMessageChannel.class);
DirectFieldAccessor cAccessor = new DirectFieldAccessor(channel);
Object iList = cAccessor.getPropertyValue("interceptors");
DirectFieldAccessor iAccessor = new DirectFieldAccessor(iList);
@SuppressWarnings("unchecked")
List<PreSendReturnsMessageInterceptor> interceptorList =
(List<PreSendReturnsMessageInterceptor>) iAccessor.getPropertyValue("interceptors");
String foo = interceptorList.get(0).getFoo();
ChannelInterceptorAware channel = ac.getBean("input", AbstractMessageChannel.class);
List<ChannelInterceptor> interceptors = channel.getChannelInterceptors();
ChannelInterceptor channelInterceptor = interceptors.get(0);
assertThat(channelInterceptor, Matchers.instanceOf(PreSendReturnsMessageInterceptor.class));
String foo = ((PreSendReturnsMessageInterceptor) channelInterceptor).getFoo();
assertTrue(StringUtils.hasText(foo));
assertEquals("foo", foo);
}
Expand Down
Loading

0 comments on commit 2b888b7

Please sign in to comment.