Skip to content

Commit

Permalink
INT-2426: Add Idempotent Receiver EIP
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-2426

INT-2426: pushed `final` modifier fix for Java 6 compatibility

INT-2426: Rework logic to the `MetadataStore`

INT-2426: `IdempotentReceiver` -> `IdempotentReceiverInterceptor`

* Move `Idempotent Filtering` logic to the `IdempotentReceiverInterceptor`, which should be applied as a regular
AOP `Advice` to the `MessageHandler#handleMessage`
* Provide an xml component `<idempotent-receiver>`
* Introduce `IdempotentReceiverAutoProxyCreator` to get deal with `IdempotentReceiverInterceptor` and `MessageHandler`s.
The `Proxying` logic is based on the mapping between interceptor and `consumer endpoint` `ids`
* Introduce `MetadataStoreSelector` along side with `MetadataKeyStrategy` and `ExpressionMetadataKeyStrategy` implementation

INT-2426: Introduce `IdempotentReceiver` annotation

Add `IdempotentReceiverIntegrationTests` in the JMX module to be sure that all proxying works well.

INT-2426: Polishing according PR comments

* Rename `IdempotentReceiverAutoProxyCreatorInitializer`
* Add support for several `IRI` for the one `MH`
* Polishing JavaDocs

INT-2426: Add `What's New` note

Doc Polishing.

More Doc Polishing

Use Timestamp (hex) instead of Id for Value

Facilitate cleanup.
  • Loading branch information
artembilan authored and garyrussell committed Oct 24, 2014
1 parent 6e81950 commit ff2b15e
Show file tree
Hide file tree
Showing 23 changed files with 1,573 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class IntegrationMessageHeaderAccessor extends MessageHeaderAccessor {

public static final String ROUTING_SLIP = "routingSlip";

public static final String DUPLICATE_MESSAGE = "duplicateMessage";

public IntegrationMessageHeaderAccessor(Message<?> message) {
super(message);
}
Expand Down Expand Up @@ -107,6 +109,10 @@ else if (IntegrationMessageHeaderAccessor.ROUTING_SLIP.equals(headerName)) {
Assert.isTrue(Map.class.isAssignableFrom(headerValue.getClass()), "The '" + headerName
+ "' header value must be an List.");
}
else if (IntegrationMessageHeaderAccessor.DUPLICATE_MESSAGE.equals(headerName)) {
Assert.isTrue(Boolean.class.isAssignableFrom(headerValue.getClass()), "The '" + headerName
+ "' header value must be an Boolean.");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* A {@code @Bean} that has a MessagingAnnotation (@code @ServiceActivator, @Router etc.)
* that also has this annotation, has an
* {@link org.springframework.integration.handler.advice.IdempotentReceiverInterceptor} applied
* to the associated {@link org.springframework.messaging.MessageHandler#handleMessage} method.
* The interceptor bean names are provided in the {@link #value()}.
*
* @author Artem Bilan
* @since 4.1
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface IdempotentReceiver {

/**
* @return the {@link org.springframework.integration.handler.advice.IdempotentReceiverInterceptor}
* bean references.
*/
String[] value();

}
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,14 @@ public void afterPropertiesSet() throws Exception {
}
if (!CollectionUtils.isEmpty(this.adviceChain)) {
/*
* ARPMHs advise the handleRequesMessage method internally and already have the advice chain injected.
* ARPMHs advise the handleRequestMessage method internally and already have the advice chain injected.
* So we only advise handlers that are not reply-producing. If the handler is already advised,
* add the configured advices to its chain, otherwise create a proxy.
*/
if (!(this.handler instanceof AbstractReplyProducingMessageHandler)) {
if (AopUtils.isAopProxy(this.handler) && this.handler instanceof Advised) {
Class<?> targetClass = AopUtils.getTargetClass(this.handler);
Class<?> targetClass = AopUtils.getTargetClass(this.handler);

if (!(AbstractReplyProducingMessageHandler.class.isAssignableFrom(targetClass))) {
if (AopUtils.isAopProxy(this.handler)) {
for (Advice advice : this.adviceChain) {
NameMatchMethodPointcutAdvisor handlerAdvice = new NameMatchMethodPointcutAdvisor(advice);
handlerAdvice.addMethodName("handleMessage");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.config;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.springframework.aop.Advisor;
import org.springframework.aop.TargetSource;
import org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator;
import org.springframework.aop.support.DefaultBeanFactoryPointcutAdvisor;
import org.springframework.aop.support.NameMatchMethodPointcut;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.PatternMatchUtils;

/**
* The {@link AbstractAutoProxyCreator} implementation that applies {@code IdempotentReceiverInterceptor}s
* to {@link MessageHandler}s mapped by their {@code endpoint beanName}.
*
* @author Artem Bilan
* @since 4.1
*/
@SuppressWarnings("serial")
class IdempotentReceiverAutoProxyCreator extends AbstractAutoProxyCreator {

private List<Map<String, String>> idempotentEndpointsMapping;

private Map<String, List<String>> idempotentEndpoints;

public void setIdempotentEndpointsMapping(List<Map<String, String>> idempotentEndpointsMapping) {
Assert.notEmpty(idempotentEndpointsMapping);
this.idempotentEndpointsMapping = idempotentEndpointsMapping;
}

@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName,
TargetSource customTargetSource) throws BeansException {
initIdempotentEndpointsIfNecessary();

if (MessageHandler.class.isAssignableFrom(beanClass)) {
List<Advisor> interceptors = new ArrayList<Advisor>();
for (Map.Entry<String, List<String>> entry : this.idempotentEndpoints.entrySet()) {
List<String> mappedNames = entry.getValue();
for (String mappedName : mappedNames) {
String pattern = mappedName + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX;
if (isMatch(pattern, beanName)) {
DefaultBeanFactoryPointcutAdvisor idempotentReceiverInterceptor
= new DefaultBeanFactoryPointcutAdvisor();
idempotentReceiverInterceptor.setAdviceBeanName(entry.getKey());
NameMatchMethodPointcut pointcut = new NameMatchMethodPointcut();
pointcut.setMappedName("handleMessage");
idempotentReceiverInterceptor.setPointcut(pointcut);
idempotentReceiverInterceptor.setBeanFactory(getBeanFactory());
interceptors.add(idempotentReceiverInterceptor);
}
}
}
if (!interceptors.isEmpty()) {
return interceptors.toArray();
}
}
return DO_NOT_PROXY;
}

private void initIdempotentEndpointsIfNecessary() {
if (this.idempotentEndpoints == null) {
synchronized (this) {
if (this.idempotentEndpoints == null) {
this.idempotentEndpoints = new LinkedHashMap<String, List<String>>();
for (Map<String, String> mapping : this.idempotentEndpointsMapping) {
Assert.isTrue(mapping.size() == 1, "The 'idempotentEndpointMapping' must be a SingletonMap");
String interceptor = mapping.keySet().iterator().next();
String endpoint = mapping.values().iterator().next();
Assert.hasText(interceptor, "The 'idempotentReceiverInterceptor' can't be empty String");
Assert.hasText(endpoint, "The 'idempotentReceiverEndpoint' can't be empty String");
List<String> endpoints = this.idempotentEndpoints.get(interceptor);
if (endpoints == null) {
endpoints = new ArrayList<String>();
this.idempotentEndpoints.put(interceptor, endpoints);
}
endpoints.add(endpoint);
}
}
}
}
}

private boolean isMatch(String mappedName, String beanName) {
boolean matched = PatternMatchUtils.simpleMatch(mappedName, beanName);
if (!matched) {
BeanFactory beanFactory = getBeanFactory();
if (beanFactory != null) {
String[] aliases = beanFactory.getAliases(beanName);
for (String alias : aliases) {
matched = PatternMatchUtils.simpleMatch(mappedName, alias);
if (matched) {
break;
}
}
}
}
return matched;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.config;

import java.util.List;
import java.util.Map;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.beans.factory.support.ManagedMap;
import org.springframework.core.type.MethodMetadata;
import org.springframework.integration.annotation.IdempotentReceiver;
import org.springframework.integration.handler.advice.IdempotentReceiverInterceptor;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
* The {@link IntegrationConfigurationInitializer} that populates
* the {@link ConfigurableListableBeanFactory}
* with an {@link IdempotentReceiverAutoProxyCreator}
* when {@code IdempotentReceiverInterceptor} {@link BeanDefinition}s and their {@code mapping}
* to Consumer Endpoints are present.
*
* @author Artem Bilan
* @since 4.1
*/
public class IdempotentReceiverAutoProxyCreatorInitializer implements IntegrationConfigurationInitializer {

public static final String IDEMPOTENT_ENDPOINTS_MAPPING = "IDEMPOTENT_ENDPOINTS_MAPPING";

private static final String IDEMPOTENT_RECEIVER_AUTO_PROXY_CREATOR_BEAN_NAME =
IdempotentReceiverAutoProxyCreator.class.getName();

@Override
public void initialize(ConfigurableListableBeanFactory beanFactory) throws BeansException {
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;

List<Map<String, String>> idempotentEndpointsMapping = new ManagedList<Map<String, String>>();

for (String beanName : registry.getBeanDefinitionNames()) {
BeanDefinition beanDefinition = registry.getBeanDefinition(beanName);
if (IdempotentReceiverInterceptor.class.getName().equals(beanDefinition.getBeanClassName())) {
Object value = beanDefinition.removeAttribute(IDEMPOTENT_ENDPOINTS_MAPPING);
Assert.isInstanceOf(String.class, value,
"The 'mapping' of BeanDefinition 'IDEMPOTENT_ENDPOINTS_MAPPING' must be String.");
String mapping = (String) value;
String[] endpoints = StringUtils.tokenizeToStringArray(mapping, ",");
for (String endpoint : endpoints) {
Map<String, String> idempotentEndpoint = new ManagedMap<String, String>();
idempotentEndpoint.put(beanName, beanFactory.resolveEmbeddedValue(endpoint));
idempotentEndpointsMapping.add(idempotentEndpoint);
}
}
else if (beanDefinition instanceof AnnotatedBeanDefinition) {
if (beanDefinition.getSource() instanceof MethodMetadata) {
MethodMetadata beanMethod = (MethodMetadata) beanDefinition.getSource();
String annotationType = IdempotentReceiver.class.getName();
if (beanMethod.isAnnotated(annotationType)) {
Object value = beanMethod.getAnnotationAttributes(annotationType).get("value");
if (value != null) {
String[] interceptors = (String[]) value;
/*
MessageHandler beans, populated from @Bean methods, have a complex id,
including @Configuration bean name, method name and the Messaging annotation name.
The following pattern matches the bean name, regardless of the annotation name.
*/
String endpoint = beanDefinition.getFactoryBeanName() + "." + beanName + ".*";
for (String interceptor : interceptors) {
Map<String, String> idempotentEndpoint = new ManagedMap<String, String>();
idempotentEndpoint.put(interceptor, endpoint);
idempotentEndpointsMapping.add(idempotentEndpoint);
}
}
}
}
}
}

if (!idempotentEndpointsMapping.isEmpty()) {
BeanDefinition bd = BeanDefinitionBuilder.rootBeanDefinition(IdempotentReceiverAutoProxyCreator.class)
.addPropertyValue("idempotentEndpointsMapping", idempotentEndpointsMapping)
.getBeanDefinition();
registry.registerBeanDefinition(IDEMPOTENT_RECEIVER_AUTO_PROXY_CREATOR_BEAN_NAME, bd);
}
}

}
Loading

0 comments on commit ff2b15e

Please sign in to comment.