diff --git a/build.gradle b/build.gradle index df93a935f58..ee0170de824 100644 --- a/build.gradle +++ b/build.gradle @@ -126,13 +126,14 @@ subprojects { subproject -> smack3Version = '3.2.1' smackVersion = '4.0.6' springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '1.5.0.M1' +// springCloudClusterVersion = '1.0.0.BUILD-SNAPSHOT' springDataMongoVersion = '1.7.0.RELEASE' springDataRedisVersion = '1.5.0.RELEASE' springGemfireVersion = '1.6.0.RELEASE' springSecurityVersion = project.hasProperty('springSecurityVersion') ? project.springSecurityVersion : '4.0.1.RELEASE' springSocialTwitterVersion = '1.1.0.RELEASE' springRetryVersion = '1.1.2.RELEASE' - springVersion = project.hasProperty('springVersion') ? project.springVersion : '4.2.0.BUILD-SNAPSHOT' + springVersion = project.hasProperty('springVersion') ? project.springVersion : '4.2.0.RC2' springWsVersion = '2.2.1.RELEASE' xmlUnitVersion = '1.5' xstreamVersion = '1.4.7' @@ -268,6 +269,7 @@ project('spring-integration-core') { compile "org.springframework:spring-messaging:$springVersion" compile "org.springframework:spring-tx:$springVersion" compile "org.springframework.retry:spring-retry:$springRetryVersion" +// compile ("org.springframework.cloud:spring-cloud-cluster-core:$springCloudClusterVersion", optional) compile ("io.projectreactor:reactor-stream:$reactorVersion", optional) compile("com.fasterxml.jackson.core:jackson-databind:$jackson2Version", optional) compile("com.jayway.jsonpath:json-path:$jsonpathVersion", optional) @@ -687,14 +689,13 @@ project('spring-integration-zookeeper') { description = 'Spring Integration Zookeeper Support' dependencies { compile project(":spring-integration-core") - compile ("org.apache.zookeeper:zookeeper:$zookeeperVersion") { exclude group: 'io.netty', module: 'netty' exclude group: 'junit', module: 'junit' exclude group: 'org.slf4j', module: 'slf4j-log4j12' exclude group: 'log4j', module: 'log4j' } - +// compile "org.springframework.cloud:spring-cloud-cluster-zookeeper:$springCloudClusterVersion" compile("org.apache.curator:curator-recipes:$curatorVersion") { exclude group: 'org.apache.zookeeper', module: 'zookeeper' } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/annotation/Role.java b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Role.java new file mode 100644 index 00000000000..b5ddfbcf938 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/annotation/Role.java @@ -0,0 +1,44 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotate endpoints to assign them to a role. Such endpoints can be started/stopped as + * a group. See {@code SmartLifecycleRoleController}. + * + * @author Gary Russell + * @since 4.2 + */ +@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +@Documented +public @interface Role { + + /** + * @return the role for this endpoint. See {@code SmartLifecycleRoleController}. + */ + String value() default ""; + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationConfigUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationConfigUtils.java index 9a1161044e4..fe4ba971f03 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationConfigUtils.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationConfigUtils.java @@ -20,7 +20,11 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.support.BeanDefinitionRegistry; +import org.springframework.beans.factory.support.ManagedList; +import org.springframework.context.Lifecycle; import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.integration.support.SmartLifecycleRoleController; /** * Shared utility methods for Integration configuration. @@ -51,6 +55,17 @@ public static void autoCreateDirectChannel(String channelName, BeanDefinitionReg BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry); } + public static void registerRoleControllerDefinitionIfNecessary(BeanDefinitionRegistry registry) { + if (!registry.containsBeanDefinition( + IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER)) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SmartLifecycleRoleController.class); + builder.addConstructorArgValue(new ManagedList()); + builder.addConstructorArgValue(new ManagedList()); + registry.registerBeanDefinition( + IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER, builder.getBeanDefinition()); + } + } + private IntegrationConfigUtils() { } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationRegistrar.java b/spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationRegistrar.java index 03804109edf..fc4f30439ac 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationRegistrar.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationRegistrar.java @@ -23,6 +23,9 @@ import java.util.List; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.beans.factory.ListableBeanFactory; import org.springframework.beans.factory.config.BeanDefinition; @@ -49,9 +52,6 @@ import org.springframework.integration.support.utils.IntegrationUtils; import org.springframework.util.ClassUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** * {@link ImportBeanDefinitionRegistrar} implementation that configures integration infrastructure. * @@ -102,6 +102,7 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B this.registerMessagingAnnotationPostProcessors(importingClassMetadata, registry); } this.registerMessageBuilderFactory(registry); + IntegrationConfigUtils.registerRoleControllerDefinitionIfNecessary(registry); } /** diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessor.java index 4733d447087..74ca9502be5 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessor.java @@ -24,6 +24,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.commons.logging.Log; @@ -35,6 +36,8 @@ import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.context.ApplicationEvent; @@ -49,15 +52,20 @@ import org.springframework.integration.annotation.BridgeTo; import org.springframework.integration.annotation.Filter; import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Role; import org.springframework.integration.annotation.Router; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.annotation.Splitter; import org.springframework.integration.annotation.Transformer; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.support.SmartLifecycleRoleController; import org.springframework.integration.util.MessagingAnnotationUtils; import org.springframework.stereotype.Component; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; @@ -71,7 +79,8 @@ * @author Gary Russell */ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, BeanFactoryAware, - InitializingBean, Lifecycle, ApplicationListener, EnvironmentAware { + InitializingBean, Lifecycle, ApplicationListener, EnvironmentAware, + SmartInitializingSingleton { private final Log logger = LogFactory.getLog(this.getClass()); @@ -88,6 +97,7 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Bean private volatile boolean running = true; + private final MultiValueMap lazyLifecyleRoles = new LinkedMultiValueMap(); @Override public void setBeanFactory(BeanFactory beanFactory) { @@ -120,6 +130,21 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro return bean; } + @Override + public void afterSingletonsInstantiated() { + SmartLifecycleRoleController roleController; + try { + roleController = beanFactory.getBean(IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER, + SmartLifecycleRoleController.class); + for (Entry> entry : this.lazyLifecyleRoles.entrySet()) { + roleController.addLifecyclesToRole(entry.getKey(), entry.getValue()); + } + } + catch (NoSuchBeanDefinitionException e) { + logger.error("No lifecyle role controller in context"); + } + } + @Override public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { Assert.notNull(this.beanFactory, "BeanFactory must not be null"); @@ -200,6 +225,10 @@ public void doWith(Method method) throws IllegalArgumentException, IllegalAccess if (result instanceof ApplicationListener) { listeners.add((ApplicationListener) result); } + Role role = AnnotationUtils.findAnnotation(method, Role.class); + if (role != null) { + lazyLifecyleRoles.add(role.value(), endpointBeanName); + } } } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractChannelAdapterParser.java b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractChannelAdapterParser.java index a13c3ea9fea..2fdf22560ae 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractChannelAdapterParser.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractChannelAdapterParser.java @@ -73,6 +73,13 @@ protected final AbstractBeanDefinition parseInternal(Element element, ParserCont if (StringUtils.hasText(phase)) { propertyValues.add("phase", new TypedStringValue(phase)); } + String role = element.getAttribute(IntegrationNamespaceUtils.ROLE); + if (StringUtils.hasText(role)) { + if (!StringUtils.hasText(element.getAttribute(ID_ATTRIBUTE))) { + parserContext.getReaderContext().error("When using 'role', 'id' is required", element); + } + IntegrationNamespaceUtils.putLifecycleInRole(role, element.getAttribute(ID_ATTRIBUTE), parserContext); + } return beanDefinition; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java index 76bd955970f..16343ac9855 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java @@ -155,6 +155,13 @@ protected final AbstractBeanDefinition parseInternal(Element element, ParserCont } IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, IntegrationNamespaceUtils.AUTO_STARTUP); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, IntegrationNamespaceUtils.PHASE); + String role = element.getAttribute(IntegrationNamespaceUtils.ROLE); + if (StringUtils.hasText(role)) { + if (!StringUtils.hasText(element.getAttribute(ID_ATTRIBUTE))) { + parserContext.getReaderContext().error("When using 'role', 'id' is required", element); + } + IntegrationNamespaceUtils.putLifecycleInRole(role, element.getAttribute(ID_ATTRIBUTE), parserContext); + } AbstractBeanDefinition beanDefinition = builder.getBeanDefinition(); String beanName = this.resolveId(element, beanDefinition, parserContext); parserContext.registerBeanComponent(new BeanComponentDefinition(beanDefinition, beanName)); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java index 5b6c13cd85a..8578147998e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java @@ -24,6 +24,7 @@ import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinitionHolder; +import org.springframework.beans.factory.config.BeanReference; import org.springframework.beans.factory.config.ConstructorArgumentValues; import org.springframework.beans.factory.config.ConstructorArgumentValues.ValueHolder; import org.springframework.beans.factory.config.RuntimeBeanReference; @@ -72,6 +73,7 @@ public abstract class IntegrationNamespaceUtils { public static final String REQUEST_HANDLER_ADVICE_CHAIN = "request-handler-advice-chain"; public static final String AUTO_STARTUP = "auto-startup"; public static final String PHASE = "phase"; + public static final String ROLE = "role"; /** * Configures the provided bean definition builder with a property value corresponding to the attribute whose name @@ -547,4 +549,19 @@ public static void checkAndConfigureFixedSubscriberChannel(Element element, Pars } } + public static void putLifecycleInRole(String role, String beanName, ParserContext parserContext) { + BeanDefinitionRegistry registry = parserContext.getRegistry(); + IntegrationConfigUtils.registerRoleControllerDefinitionIfNecessary(registry); + BeanDefinition controllerDef = registry.getBeanDefinition( + IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER); + @SuppressWarnings("unchecked") + ManagedList roles = (ManagedList) controllerDef.getConstructorArgumentValues() + .getArgumentValue(0, ManagedList.class).getValue(); + @SuppressWarnings("unchecked") + ManagedList lifecycles = (ManagedList) controllerDef.getConstructorArgumentValues() + .getArgumentValue(1, ManagedList.class).getValue(); + roles.add(role); + lifecycles.add(new RuntimeBeanReference(beanName)); + } + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java index 1c6a6790865..ddb830e12e1 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java @@ -77,6 +77,8 @@ public abstract class IntegrationContextUtils { public static final String TO_STRING_FRIENDLY_JSON_NODE_TO_STRING_CONVERTER_BEAN_NAME = "toStringFriendlyJsonNodeToStringConverter"; + public static final String INTEGRATION_LIFECYCLE_ROLE_CONTROLLER = "integrationLifecycleRoleController"; + /** * @param beanFactory BeanFactory for lookup, must not be null. * @return The {@link MetadataStore} bean whose name is "metadataStore". diff --git a/spring-integration-core/src/main/java/org/springframework/integration/leader/AbstractCandidate.java b/spring-integration-core/src/main/java/org/springframework/integration/leader/AbstractCandidate.java new file mode 100644 index 00000000000..719ed5dc35e --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/leader/AbstractCandidate.java @@ -0,0 +1,70 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.leader; + +import java.util.UUID; + +import org.springframework.util.StringUtils; + +/** + * Base implementation of a {@link Candidate}. + * + * @author Janne Valkealahti + * + */ +public abstract class AbstractCandidate implements Candidate { + + private static final String DEFAULT_ROLE = "leader"; + + private final String id; + + private final String role; + + /** + * Instantiate a abstract candidate. + */ + public AbstractCandidate() { + this(null, null); + } + + /** + * Instantiate a abstract candidate. + * + * @param id the identifier + * @param role the role + */ + public AbstractCandidate(String id, String role) { + this.id = StringUtils.hasText(id) ? id : UUID.randomUUID().toString(); + this.role = StringUtils.hasText(role) ? role : DEFAULT_ROLE; + } + + @Override + public String getRole() { + return this.role; + } + + @Override + public String getId() { + return this.id; + } + + @Override + public abstract void onGranted(Context ctx) throws InterruptedException; + + @Override + public abstract void onRevoked(Context ctx); + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/leader/Candidate.java b/spring-integration-core/src/main/java/org/springframework/integration/leader/Candidate.java new file mode 100644 index 00000000000..c84d8f0158c --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/leader/Candidate.java @@ -0,0 +1,72 @@ +/* + * Copyright 2014-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.leader; + +/** + * Interface that defines the contract for candidates to participate + * in a leader election. The callback methods {@link #onGranted(Context)} + * and {@link #onRevoked(Context)} are invoked when leadership is + * granted and revoked. + * + * @author Patrick Peralta + * @author Janne Valkealahti + * + */ +public interface Candidate { + + /** + * Gets the role. + * + * @return a string indicating the name of the leadership role + * this candidate is participating in; other candidates + * present in the system with the same name will contend + * for leadership + */ + String getRole(); + + /** + * Gets the identifier. + * + * @return a unique ID for this candidate; no other candidate for + * leader election should return the same id + */ + String getId(); + + /** + * Callback method invoked when this candidate is elected leader. + * Implementations may chose to launch a background thread to + * perform leadership roles and return immediately. Another option + * is for implementations to perform all leadership work in the + * thread invoking this method. In the latter case, the + * method must respond to thread interrupts by throwing + * {@link java.lang.InterruptedException}. When the thread + * is interrupted, this indicates that this candidate is no + * longer leader. + * + * @param ctx leadership context + * @throws InterruptedException when this candidate is no longer leader + */ + void onGranted(Context ctx) throws InterruptedException; + + /** + * Callback method invoked when this candidate is no longer leader. + * Implementations should use this to shut down any resources + * (threads, network connections, etc) used to perform leadership work. + * + * @param ctx leadership context + */ + void onRevoked(Context ctx); +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/leader/Context.java b/spring-integration-core/src/main/java/org/springframework/integration/leader/Context.java new file mode 100644 index 00000000000..8171650ba0e --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/leader/Context.java @@ -0,0 +1,44 @@ +/* + * Copyright 2014-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.leader; + +/** + * Interface that defines the context for candidate leadership. + * Instances of this object are passed to {@link Candidate candidates} + * upon granting and revoking of leadership. + * + * @author Patrick Peralta + * @author Janne Valkealahti + * + */ +public interface Context { + + /** + * Checks if the {@link Candidate} this context was + * passed to is the leader. + * + * @return true if the {@link Candidate} this context was + * passed to is the leader + */ + boolean isLeader(); + + /** + * Causes the {@link Candidate} this context was passed to + * to relinquish leadership. This method has no effect + * if the candidate is not currently the leader. + */ + void yield(); +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/leader/DefaultCandidate.java b/spring-integration-core/src/main/java/org/springframework/integration/leader/DefaultCandidate.java new file mode 100644 index 00000000000..354f6c64934 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/leader/DefaultCandidate.java @@ -0,0 +1,76 @@ +/* + * Copyright 2014-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.leader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple {@link Candidate} for leadership. + * This implementation simply logs when it is elected and when its leadership is revoked. + */ +public class DefaultCandidate extends AbstractCandidate { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private volatile Context leaderContext; + + /** + * Instantiate a default candidate. + */ + public DefaultCandidate() { + super(); + } + + /** + * Instantiate a default candidate. + * + * @param id the identifier + * @param role the role + */ + public DefaultCandidate(String id, String role) { + super(id, role); + } + + @Override + public void onGranted(Context ctx) { + logger.info("{} has been granted leadership; context: {}", this, ctx); + leaderContext = ctx; + } + + @Override + public void onRevoked(Context ctx) { + logger.info("{} leadership has been revoked", this, ctx); + } + + /** + * Voluntarily yield leadership if held. If leader context is not + * yet known this method does nothing. Leader context becomes available + * only after {@link #onGranted(Context)} method is called by the + * leader initiator. + */ + public void yieldLeadership() { + if (leaderContext != null) { + leaderContext.yield(); + } + } + + @Override + public String toString() { + return String.format("DefaultCandidate{role=%s, id=%s}", getRole(), getId()); + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/leader/event/AbstractLeaderEvent.java b/spring-integration-core/src/main/java/org/springframework/integration/leader/event/AbstractLeaderEvent.java new file mode 100644 index 00000000000..2cf16c9f0c7 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/leader/event/AbstractLeaderEvent.java @@ -0,0 +1,82 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.leader.event; + +import org.springframework.context.ApplicationEvent; +import org.springframework.integration.leader.Context; + +/** + * Base {@link ApplicationEvent} class for leader based events. All custom event + * classes should be derived from this class. + * + * @author Janne Valkealahti + * @author Gary Russell + * + */ +@SuppressWarnings("serial") +public abstract class AbstractLeaderEvent extends ApplicationEvent { + + private final Context context; + + private final String role; + + /** + * Create a new ApplicationEvent. + * + * @param source the component that published the event (never {@code null}) + */ + public AbstractLeaderEvent(Object source) { + this(source, null, null); + } + + /** + * Create a new ApplicationEvent. + * + * @param source the component that published the event (never {@code null}) + * @param context the context associated with this event + * @param role the role of the leader + */ + public AbstractLeaderEvent(Object source, Context context, String role) { + super(source); + this.context = context; + this.role = role; + } + + /** + * Get the {@link Context} associated with this event. + * + * @return the context + */ + public Context getContext() { + return context; + } + + /** + * Get the role of the leader. + * + * @return the role + */ + public String getRole() { + return role; + } + + @Override + public String toString() { + return getClass().getSimpleName() + " [role=" + role + ", context=" + context + ", source=" + source + + "]"; + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/leader/event/DefaultLeaderEventPublisher.java b/spring-integration-core/src/main/java/org/springframework/integration/leader/event/DefaultLeaderEventPublisher.java new file mode 100644 index 00000000000..a194f8ee943 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/leader/event/DefaultLeaderEventPublisher.java @@ -0,0 +1,67 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.leader.event; + +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.integration.leader.Context; + +/** + * Default implementation of {@link LeaderEventPublisher}. + * + * @author Janne Valkealahti + * @author Gary Russell + * + */ +public class DefaultLeaderEventPublisher implements LeaderEventPublisher, ApplicationEventPublisherAware { + + private ApplicationEventPublisher applicationEventPublisher; + + /** + * Instantiates a new leader event publisher. + */ + public DefaultLeaderEventPublisher() { + } + + /** + * Instantiates a new leader event publisher. + * + * @param applicationEventPublisher the application event publisher + */ + public DefaultLeaderEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + + @Override + public void publishOnGranted(Object source, Context context, String role) { + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new OnGrantedEvent(source, context, role)); + } + } + + @Override + public void publishOnRevoked(Object source, Context context, String role) { + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new OnRevokedEvent(source, context, role)); + } + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/leader/event/LeaderEventPublisher.java b/spring-integration-core/src/main/java/org/springframework/integration/leader/event/LeaderEventPublisher.java new file mode 100644 index 00000000000..cdf960b3dd0 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/leader/event/LeaderEventPublisher.java @@ -0,0 +1,47 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.leader.event; + +import org.springframework.integration.leader.Context; + +/** + * Interface for publishing leader based application events. + * + * @author Janne Valkealahti + * @author Gary Russell + * + */ +public interface LeaderEventPublisher { + + /** + * Publish a granted event. + * + * @param source the component generated this event + * @param context the context associated with event + * @param role the role of the leader + */ + void publishOnGranted(Object source, Context context, String role); + + /** + * Publish a revoked event. + * + * @param source the component generated this event + * @param context the context associated with event + * @param role the role of the leader + */ + void publishOnRevoked(Object source, Context context, String role); + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/leader/event/OnGrantedEvent.java b/spring-integration-core/src/main/java/org/springframework/integration/leader/event/OnGrantedEvent.java new file mode 100644 index 00000000000..ff2997f2878 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/leader/event/OnGrantedEvent.java @@ -0,0 +1,41 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.leader.event; + +import org.springframework.integration.leader.Context; + +/** + * Generic event representing that leader has been granted. + * + * @author Janne Valkealahti + * @author Gary Russell + * + */ +@SuppressWarnings("serial") +public class OnGrantedEvent extends AbstractLeaderEvent { + + /** + * Instantiates a new granted event. + * + * @param source the component that published the event (never {@code null}) + * @param context the context associated with this event + * @param role the role of the leader + */ + public OnGrantedEvent(Object source, Context context, String role) { + super(source, context, role); + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/leader/event/OnRevokedEvent.java b/spring-integration-core/src/main/java/org/springframework/integration/leader/event/OnRevokedEvent.java new file mode 100644 index 00000000000..6bc85b7b98a --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/leader/event/OnRevokedEvent.java @@ -0,0 +1,41 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.leader.event; + +import org.springframework.integration.leader.Context; + +/** + * Generic event representing that leader has been revoked. + * + * @author Janne Valkealahti + * @author Gary Russell + * + */ +@SuppressWarnings("serial") +public class OnRevokedEvent extends AbstractLeaderEvent { + + /** + * Instantiates a new revoked event. + * + * @param source the component that published the event (never {@code null}) + * @param context the context associated with this event + * @param role the role of the leader + */ + public OnRevokedEvent(Object source, Context context, String role) { + super(source, context, role); + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/leader/event/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/leader/event/package-info.java new file mode 100644 index 00000000000..1a4967547e4 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/leader/event/package-info.java @@ -0,0 +1,4 @@ +/** + * Temporary package until s-c-c-core is released. + */ +package org.springframework.integration.leader.event; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/leader/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/leader/package-info.java new file mode 100644 index 00000000000..dc80ed6d29e --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/leader/package-info.java @@ -0,0 +1,4 @@ +/** + * Temporary package until s-c-c-core is released. + */ +package org.springframework.integration.leader; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/SmartLifecycleRoleController.java b/spring-integration-core/src/main/java/org/springframework/integration/support/SmartLifecycleRoleController.java new file mode 100644 index 00000000000..cf5edb38f9a --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/SmartLifecycleRoleController.java @@ -0,0 +1,215 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.support; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ApplicationListener; +import org.springframework.context.SmartLifecycle; +import org.springframework.integration.leader.event.AbstractLeaderEvent; +import org.springframework.integration.leader.event.OnGrantedEvent; +import org.springframework.integration.leader.event.OnRevokedEvent; +import org.springframework.util.Assert; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +/** + * Bulk start/stop {@link SmartLifecycle} in a particular role in phase order. + * + * @author Gary Russell + * @since 4.2 + * + */ +public class SmartLifecycleRoleController implements ApplicationListener, + ApplicationContextAware { + + private static final Log logger = LogFactory.getLog(SmartLifecycleRoleController.class); + + private final MultiValueMap lifecycles = new LinkedMultiValueMap(); + + private final MultiValueMap lazyLifecycles = new LinkedMultiValueMap(); + + private ApplicationContext applicationContext; + + /** + * Construct an instance with the provided lists of roles and lifecycles, which must be of equal length. + * @param roles the roles. + * @param lifecycles the lifecycles corresponding to the roles. + */ + public SmartLifecycleRoleController(List roles, List lifecycles) { + Assert.notNull(roles, "'roles' cannot be null"); + Assert.notNull(lifecycles, "'lifecycles' cannot be null"); + Assert.isTrue(roles.size() == lifecycles.size(), "'roles' and 'lifecycles' must be the same lenght"); + Iterator iterator = lifecycles.iterator(); + for (String role : roles) { + SmartLifecycle lifecycle = iterator.next(); + addLifecycleToRole(role, lifecycle); + } + } + + /** + * Construct an instance with the provided map of roles/instances. + * @param lifcycles the {@link MultiValueMap} of beans in roles. + */ + public SmartLifecycleRoleController(MultiValueMap lifcycles) { + for (Entry> lifecyclesInRole : lifcycles.entrySet()) { + String role = lifecyclesInRole.getKey(); + for (SmartLifecycle lifecycle : lifecyclesInRole.getValue()) { + addLifecycleToRole(role, lifecycle); + } + } + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + /** + * Add a {@link SmartLifecycle} to the role. + * @param role the role. + * @param lifecycle the {@link SmartLifecycle}. + */ + public void addLifecycleToRole(String role, SmartLifecycle lifecycle) { + this.lifecycles.add(role, lifecycle); + } + + /** + * Add a {@link SmartLifecycle} bean to the role using its name. + * @param role the role. + * @param lifecycleBeanName the bean name of the {@link SmartLifecycle}. + */ + public void addLifecycleToRole(String role, String lifecycleBeanName) { + Assert.state(this.applicationContext != null, "An application context is required to use this method"); + this.lazyLifecycles.add(role, lifecycleBeanName); + } + + /** + * Add a {@link SmartLifecycle} beans to the role using their names. + * @param role the role. + * @param lifecycleBeanNames the bean names of the {@link SmartLifecycle}s. + */ + public void addLifecyclesToRole(String role, List lifecycleBeanNames) { + Assert.state(this.applicationContext != null, "An application context is required to use this method"); + for (String lifecycleBeanName : lifecycleBeanNames) { + this.lazyLifecycles.add(role, lifecycleBeanName); + } + } + + /** + * Start all registered {@link SmartLifecycle}s in the role. + * @param role the role. + */ + public void startLifecyclesInRole(String role) { + if (this.lazyLifecycles.size() > 0) { + addLazyLifecycles(); + } + List lifecycles = this.lifecycles.get(role); + if (lifecycles != null) { + lifecycles = new ArrayList(lifecycles); + Collections.sort(lifecycles, new Comparator() { + + @Override + public int compare(SmartLifecycle o1, SmartLifecycle o2) { + return o1.getPhase() < o2.getPhase() ? -1 + : o1.getPhase() > o2.getPhase() ? 1 : 0; + } + + }); + for (SmartLifecycle lifecycle : lifecycles) { + try { + lifecycle.start(); + } + catch (Exception e) { + logger.error("Failed to start " + lifecycle + " in role " + role); + } + } + } + } + + /** + * Stop all registered {@link SmartLifecycle}s in the role. + * @param role the role. + */ + public void stopLifecyclesInRole(String role) { + if (this.lazyLifecycles.size() > 0) { + addLazyLifecycles(); + } + List lifecycles = this.lifecycles.get(role); + if (lifecycles != null) { + lifecycles = new ArrayList(lifecycles); + Collections.sort(lifecycles, new Comparator() { + + @Override + public int compare(SmartLifecycle o1, SmartLifecycle o2) { + return o1.getPhase() < o2.getPhase() ? 1 + : o1.getPhase() > o2.getPhase() ? -1 : 0; + } + + }); + for (SmartLifecycle lifecycle : lifecycles) { + try { + lifecycle.stop(); + } + catch (Exception e) { + logger.error("Failed to stop " + lifecycle + " in role " + role); + } + } + } + } + + private void addLazyLifecycles() { + for (Entry> entry : this.lazyLifecycles.entrySet()) { + doAddLifecyclesToRole(entry.getKey(), entry.getValue()); + } + this.lazyLifecycles.clear(); + } + + private void doAddLifecyclesToRole(String role, List lifecycleBeanNames) { + for (String lifecycleBeanName : lifecycleBeanNames) { + try { + SmartLifecycle lifecycle = this.applicationContext.getBean(lifecycleBeanName, SmartLifecycle.class); + addLifecycleToRole(role, lifecycle); + } + catch (NoSuchBeanDefinitionException e) { + logger.warn("Skipped; no such bean :" + lifecycleBeanName); + } + } + } + + @Override + public void onApplicationEvent(AbstractLeaderEvent event) { + if (event instanceof OnGrantedEvent) { + startLifecyclesInRole(event.getRole()); + } + else if (event instanceof OnRevokedEvent) { + stopLifecyclesInRole(event.getRole()); + } + } + +} diff --git a/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.2.xsd b/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.2.xsd index 3c8df9712f7..5825bec27fc 100644 --- a/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.2.xsd +++ b/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.2.xsd @@ -4728,6 +4728,14 @@ default is 0. Values can be negative. See SmartLifeCycle. ]]> + + + + + diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EndpointRoleParserTests-context.xml b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EndpointRoleParserTests-context.xml new file mode 100644 index 00000000000..a9c25c98215 --- /dev/null +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EndpointRoleParserTests-context.xml @@ -0,0 +1,38 @@ + + + + + + + cluster + cluster + clusterX + + + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EndpointRoleParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EndpointRoleParserTests.java new file mode 100644 index 00000000000..c369fd2bc69 --- /dev/null +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EndpointRoleParserTests.java @@ -0,0 +1,115 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.config.xml; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.endpoint.EventDrivenConsumer; +import org.springframework.integration.endpoint.SourcePollingChannelAdapter; +import org.springframework.integration.leader.event.OnGrantedEvent; +import org.springframework.integration.leader.event.OnRevokedEvent; +import org.springframework.integration.support.SmartLifecycleRoleController; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * @author Gary Russell + * @since 4.2 + * + */ +@ContextConfiguration +@RunWith(SpringJUnit4ClassRunner.class) +public class EndpointRoleParserTests { + + @Autowired + private SourcePollingChannelAdapter in; + + @Autowired + private EventDrivenConsumer out1; + + @Autowired + private EventDrivenConsumer out2; + + @Autowired + private EventDrivenConsumer out3; + + @Autowired + private EventDrivenConsumer out4; + + @Autowired + private EventDrivenConsumer bridge; + + @Autowired + private SmartLifecycleRoleController controller; + + @Test + public void test() { + assertFalse(this.in.isRunning()); + assertFalse(this.out1.isRunning()); + assertFalse(this.out2.isRunning()); + assertFalse(this.out3.isRunning()); + assertFalse(this.out4.isRunning()); + assertFalse(this.bridge.isRunning()); + + this.controller.startLifecyclesInRole("cluster"); + + assertTrue(this.in.isRunning()); + assertTrue(this.out1.isRunning()); + assertTrue(this.out2.isRunning()); + assertTrue(this.out3.isRunning()); + assertFalse(this.out4.isRunning()); + assertTrue(this.bridge.isRunning()); + + this.controller.stopLifecyclesInRole("cluster"); + + assertFalse(this.in.isRunning()); + assertFalse(this.out1.isRunning()); + assertFalse(this.out2.isRunning()); + assertFalse(this.out3.isRunning()); + assertFalse(this.out4.isRunning()); + assertFalse(this.bridge.isRunning()); + + this.controller.onApplicationEvent(new OnGrantedEvent("foo", null, "cluster")); + + assertTrue(this.in.isRunning()); + assertTrue(this.out1.isRunning()); + assertTrue(this.out2.isRunning()); + assertTrue(this.out3.isRunning()); + assertFalse(this.out4.isRunning()); + assertTrue(this.bridge.isRunning()); + + this.controller.onApplicationEvent(new OnRevokedEvent("foo", null, "cluster")); + + assertFalse(this.in.isRunning()); + assertFalse(this.out1.isRunning()); + assertFalse(this.out2.isRunning()); + assertFalse(this.out3.isRunning()); + assertFalse(this.out4.isRunning()); + assertFalse(this.bridge.isRunning()); + } + + public static class Sink { + + public void foo(String s) {} + + } + +} diff --git a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java index 0cef0032c1c..1505428d57b 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java @@ -74,6 +74,7 @@ import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.Poller; import org.springframework.integration.annotation.Publisher; +import org.springframework.integration.annotation.Role; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.annotation.Transformer; import org.springframework.integration.channel.AbstractMessageChannel; @@ -88,6 +89,7 @@ import org.springframework.integration.config.GlobalChannelInterceptor; import org.springframework.integration.config.IntegrationConverter; import org.springframework.integration.core.MessagingTemplate; +import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.endpoint.MethodInvokingMessageSource; import org.springframework.integration.endpoint.PollingConsumer; import org.springframework.integration.gateway.GatewayProxyFactoryBean; @@ -96,6 +98,7 @@ import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.support.MutableMessageBuilder; +import org.springframework.integration.support.SmartLifecycleRoleController; import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -118,6 +121,7 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.context.support.AnnotationConfigContextLoader; +import org.springframework.util.MultiValueMap; import reactor.Environment; import reactor.rx.Promise; @@ -140,6 +144,9 @@ public class EnableIntegrationTests { @Autowired private PollableChannel input; + @Autowired + private SmartLifecycleRoleController roleController; + @Autowired @Qualifier("annotationTestService.handle.serviceActivator") private PollingConsumer serviceActivatorEndpoint; @@ -252,6 +259,10 @@ public class EnableIntegrationTests { @Autowired private MessageChannel controlBusChannel; + @Autowired + @Qualifier("enableIntegrationTests.ContextConfiguration2.sendAsyncHandler.serviceActivator") + private AbstractEndpoint sendAsyncHandler; + @Test public void testAnnotatedServiceActivator() { assertEquals(10L, TestUtils.getPropertyValue(this.serviceActivatorEndpoint, "maxMessagesPerPoll")); @@ -588,6 +599,21 @@ public void testPromiseGateway() throws Exception { assertThat(integers, Matchers.contains(2, 4, 6, 8, 10)); } + @Test + public void testRoles() { + this.roleController.stopLifecyclesInRole("foo"); + @SuppressWarnings("unchecked") + MultiValueMap lifecycles = TestUtils.getPropertyValue(this.roleController, + "lifecycles", MultiValueMap.class); + assertEquals(2, lifecycles.size()); + assertEquals(2, lifecycles.get("foo").size()); + assertEquals(1, lifecycles.get("bar").size()); + assertFalse(this.serviceActivatorEndpoint.isRunning()); + assertFalse(this.sendAsyncHandler.isRunning()); + assertEquals(2, lifecycles.size()); + assertEquals(2, lifecycles.get("foo").size()); + } + @Configuration @ComponentScan @IntegrationComponentScan @@ -849,6 +875,7 @@ public AtomicReference asyncAnnotationProcessThread() { @Bean @ServiceActivator(inputChannel = "sendAsyncChannel") + @Role("foo") public MessageHandler sendAsyncHandler() { return new MessageHandler() { @@ -863,6 +890,7 @@ public void handleMessage(Message message) throws MessagingException { @Bean @ServiceActivator(inputChannel = "controlBusChannel") + @Role("bar") public ExpressionControlBusFactoryBean controlBus() throws Exception { return new ExpressionControlBusFactoryBean(); } @@ -1031,6 +1059,7 @@ public static class AnnotationTestServiceImpl implements Lifecycle, AnnotationTe poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.interval}")) @Publisher @Payload("#args[0].toLowerCase()") + @Role("foo") public String handle(String payload) { return payload.toUpperCase(); } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/support/SmartLifecycleRoleControllerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/support/SmartLifecycleRoleControllerTests.java new file mode 100644 index 00000000000..fff2ee10fc8 --- /dev/null +++ b/spring-integration-core/src/test/java/org/springframework/integration/support/SmartLifecycleRoleControllerTests.java @@ -0,0 +1,55 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.support; + +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.Test; +import org.mockito.InOrder; + +import org.springframework.context.SmartLifecycle; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +/** + * @author Gary Russell + * @since 4.2 + * + */ +public class SmartLifecycleRoleControllerTests { + + @Test + public void testOrder() { + SmartLifecycle lc1 = mock(SmartLifecycle.class); + when(lc1.getPhase()).thenReturn(2); + SmartLifecycle lc2 = mock(SmartLifecycle.class); + when(lc1.getPhase()).thenReturn(1); + MultiValueMap map = new LinkedMultiValueMap(); + map.add("foo", lc1); + map.add("foo", lc2); + SmartLifecycleRoleController controller = new SmartLifecycleRoleController(map); + controller.startLifecyclesInRole("foo"); + controller.stopLifecyclesInRole("foo"); + InOrder inOrder = inOrder(lc1, lc2); + inOrder.verify(lc2).start(); + inOrder.verify(lc1).start(); + inOrder.verify(lc1).stop(); + inOrder.verify(lc2).stop(); + } + +} diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/config/xml/MqttNamespaceHandler.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/config/xml/MqttNamespaceHandler.java index d2b162a352b..572f9bd6a2b 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/config/xml/MqttNamespaceHandler.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/config/xml/MqttNamespaceHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,12 +26,10 @@ */ public class MqttNamespaceHandler extends AbstractIntegrationNamespaceHandler { - /* (non-Javadoc) - * @see org.springframework.beans.factory.xml.NamespaceHandler#init() - */ @Override public void init() { this.registerBeanDefinitionParser("message-driven-channel-adapter", new MqttMessageDrivenChannelAdapterParser()); this.registerBeanDefinitionParser("outbound-channel-adapter", new MqttOutboundChannelAdapterParser()); } + } diff --git a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/xml/LeaderListenerParser.java b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/xml/LeaderListenerParser.java new file mode 100644 index 00000000000..72eef306325 --- /dev/null +++ b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/xml/LeaderListenerParser.java @@ -0,0 +1,63 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.zookeeper.config.xml; + +import java.util.UUID; + +import org.w3c.dom.Element; + +import org.springframework.beans.factory.support.AbstractBeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.AbstractBeanDefinitionParser; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.config.xml.IntegrationNamespaceUtils; +import org.springframework.integration.leader.DefaultCandidate; +import org.springframework.integration.leader.event.DefaultLeaderEventPublisher; +import org.springframework.integration.zookeeper.leader.LeaderInitiator; + +/** + * @author Gary Russell + * @since 4.2 + * + */ +public class LeaderListenerParser extends AbstractBeanDefinitionParser { + + @Override + protected AbstractBeanDefinition parseInternal(Element element, ParserContext parserContext) { + BeanDefinitionBuilder candidateBuilder = BeanDefinitionBuilder.genericBeanDefinition(DefaultCandidate.class); + candidateBuilder.addConstructorArgValue(UUID.randomUUID().toString()); + candidateBuilder.addConstructorArgValue(element.getAttribute("role")); + + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(LeaderInitiator.class); + builder.addConstructorArgReference(element.getAttribute("client")); + builder.addConstructorArgValue(candidateBuilder.getBeanDefinition()); + builder.addConstructorArgValue(element.getAttribute("path")); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "auto-startup"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "phase"); + + BeanDefinitionBuilder publisherBuilder = BeanDefinitionBuilder + .genericBeanDefinition(DefaultLeaderEventPublisher.class); + builder.addPropertyValue("leaderEventPublisher", publisherBuilder.getBeanDefinition()); + + return builder.getBeanDefinition(); + } + + @Override + protected boolean shouldGenerateIdAsFallback() { + return true; + } + +} diff --git a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/xml/ZookeeperNamespaceHandler.java b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/xml/ZookeeperNamespaceHandler.java new file mode 100644 index 00000000000..244851bccde --- /dev/null +++ b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/xml/ZookeeperNamespaceHandler.java @@ -0,0 +1,32 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.zookeeper.config.xml; + +import org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler; + +/** + * @author Gary Russell + * @since 4.2 + * + */ +public class ZookeeperNamespaceHandler extends AbstractIntegrationNamespaceHandler { + + @Override + public void init() { + this.registerBeanDefinitionParser("leader-listener", new LeaderListenerParser()); + } + +} diff --git a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/xml/package-info.java b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/xml/package-info.java new file mode 100644 index 00000000000..6dcf881128c --- /dev/null +++ b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/xml/package-info.java @@ -0,0 +1,4 @@ +/** + * Base package for zookeeper configuration. + */ +package org.springframework.integration.zookeeper.config.xml; diff --git a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/leader/LeaderInitiator.java b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/leader/LeaderInitiator.java new file mode 100644 index 00000000000..7b7f98959ec --- /dev/null +++ b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/leader/LeaderInitiator.java @@ -0,0 +1,270 @@ +/* + * Copyright 2014-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.zookeeper.leader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.leader.LeaderSelector; +import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; + +import org.springframework.context.SmartLifecycle; +import org.springframework.integration.leader.Candidate; +import org.springframework.integration.leader.Context; +import org.springframework.integration.leader.event.LeaderEventPublisher; +import org.springframework.util.StringUtils; + +/** + * Bootstrap leadership {@link Candidate candidates} + * with ZooKeeper/Curator. Upon construction, {@link #start} must be invoked to + * register the candidate for leadership election. + * + * @author Patrick Peralta + * @author Janne Valkealahti + * @author Gary Russell + * + */ +public class LeaderInitiator implements SmartLifecycle { + + private static final Log logger = LogFactory.getLog(LeaderInitiator.class); + + private static final String DEFAULT_NAMESPACE = "/spring-integration/leader/"; + + /** + * Curator client. + */ + private final CuratorFramework client; + + /** + * Candidate for leader election. + */ + private final Candidate candidate; + + private final Object lifecycleMonitor = new Object(); + + /** + * Curator utility for selecting leaders. + */ + private volatile LeaderSelector leaderSelector; + + /** + * @see SmartLifecycle + */ + private volatile boolean autoStartup = true; + + /** + * @See SmartLifecycle which is an extension of org.springframework.context.Phased + */ + private volatile int phase; + + /** + * Flag that indicates whether the leadership election for + * this {@link #candidate} is running. + */ + private volatile boolean running; + + /** Base path in a zookeeper */ + private final String namespace; + + /** Leader event publisher if set */ + private volatile LeaderEventPublisher leaderEventPublisher; + + /** + * Construct a {@link LeaderInitiator}. + * + * @param client Curator client + * @param candidate leadership election candidate + */ + public LeaderInitiator(CuratorFramework client, Candidate candidate) { + this(client, candidate, DEFAULT_NAMESPACE); + } + + /** + * Construct a {@link LeaderInitiator}. + * + * @param client Curator client + * @param candidate leadership election candidate + * @param namespace namespace base path in zookeeper + */ + public LeaderInitiator(CuratorFramework client, Candidate candidate, String namespace) { + this.client = client; + this.candidate = candidate; + this.namespace = namespace; + } + + /** + * @return true if leadership election for this {@link #candidate} is running + */ + @Override + public boolean isRunning() { + return running; + } + + @Override + public int getPhase() { + return this.phase; + } + + /** + * @param phase the phase + * @see SmartLifecycle + */ + public void setPhase(int phase) { + this.phase = phase; + } + + @Override + public boolean isAutoStartup() { + return this.autoStartup; + } + + /** + * @param autoStartup true to start automatically + * @see SmartLifecycle + */ + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + + /** + * Start the registration of the {@link #candidate} for leader election. + */ + @Override + public void start() { + synchronized(this.lifecycleMonitor) { + if (!this.running) { + if (client.getState() != CuratorFrameworkState.STARTED) { + // we want to do curator start here because it needs to + // be started before leader selector and it gets a little + // complicated to control ordering via beans so that + // curator is fully started. + client.start(); + } + this.leaderSelector = new LeaderSelector(this.client, buildLeaderPath(), new LeaderListener()); + this.leaderSelector.setId(this.candidate.getId()); + this.leaderSelector.autoRequeue(); + this.leaderSelector.start(); + + this.running = true; + logger.debug("Started LeaderInitiator"); + } + } + } + + /** + * Stop the registration of the {@link #candidate} for leader election. + * If the candidate is currently leader, its leadership will be revoked. + */ + @Override + public void stop() { + synchronized (this.lifecycleMonitor) { + if (this.running) { + this.leaderSelector.close(); + this.running = false; + logger.debug("Stopped LeaderInitiator"); + } + } + } + + @Override + public void stop(Runnable runnable) { + stop(); + runnable.run(); + } + + /** + * Sets the {@link LeaderEventPublisher}. + * + * @param leaderEventPublisher the event publisher + */ + public void setLeaderEventPublisher(LeaderEventPublisher leaderEventPublisher) { + this.leaderEventPublisher = leaderEventPublisher; + } + + /** + * @return the ZooKeeper path used for leadership election by Curator + */ + private String buildLeaderPath() { + + String ns = StringUtils.hasText(namespace) ? namespace : DEFAULT_NAMESPACE; + if (!ns.startsWith("/")) { + ns = "/" + ns; + } + if (!ns.endsWith("/")) { + ns = ns + "/"; + } + return String.format(ns + "%s", candidate.getRole()); + } + + /** + * Implementation of Curator leadership election listener. + */ + class LeaderListener extends LeaderSelectorListenerAdapter { + + @Override + public void takeLeadership(CuratorFramework framework) throws Exception { + CuratorContext context = new CuratorContext(); + + try { + candidate.onGranted(context); + if (leaderEventPublisher != null) { + leaderEventPublisher.publishOnGranted(LeaderInitiator.this, context, candidate.getRole()); + } + + // when this method exits, the leadership will be revoked; + // therefore this thread needs to be held up until the + // candidate is no longer leader + Thread.sleep(Long.MAX_VALUE); + } + catch (InterruptedException e) { + // InterruptedException, like any other runtime exception, + // is handled by the finally block below. No need to + // reset the interrupt flag as the interrupt is handled. + } + finally { + candidate.onRevoked(context); + if (leaderEventPublisher != null) { + leaderEventPublisher.publishOnRevoked(LeaderInitiator.this, context, candidate.getRole()); + } + } + } + } + + /** + * Implementation of leadership context backed by Curator. + */ + class CuratorContext implements Context { + + @Override + public boolean isLeader() { + return leaderSelector.hasLeadership(); + } + + @Override + public void yield() { + leaderSelector.interruptLeadership(); + } + + @Override + public String toString() { + return String.format("CuratorContext{role=%s, id=%s, isLeader=%s}", + candidate.getRole(), candidate.getId(), isLeader()); + } + + } + +} diff --git a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/leader/package-info.java b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/leader/package-info.java new file mode 100644 index 00000000000..4e58a008b00 --- /dev/null +++ b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/leader/package-info.java @@ -0,0 +1,4 @@ +/** + * Temporary package until s-c-c-zookeeper is released. + */ +package org.springframework.integration.zookeeper.leader; diff --git a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/lock/package-info.java b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/lock/package-info.java new file mode 100644 index 00000000000..2222b307442 --- /dev/null +++ b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/lock/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides classes related to locking. + */ +package org.springframework.integration.zookeeper.lock; diff --git a/spring-integration-zookeeper/src/main/resources/META-INF/spring.handlers b/spring-integration-zookeeper/src/main/resources/META-INF/spring.handlers new file mode 100644 index 00000000000..4840748972c --- /dev/null +++ b/spring-integration-zookeeper/src/main/resources/META-INF/spring.handlers @@ -0,0 +1 @@ +http\://www.springframework.org/schema/integration/zookeeper=org.springframework.integration.zookeeper.config.xml.ZookeeperNamespaceHandler diff --git a/spring-integration-zookeeper/src/main/resources/META-INF/spring.schemas b/spring-integration-zookeeper/src/main/resources/META-INF/spring.schemas new file mode 100644 index 00000000000..fac1cf559cb --- /dev/null +++ b/spring-integration-zookeeper/src/main/resources/META-INF/spring.schemas @@ -0,0 +1,2 @@ +http\://www.springframework.org/schema/integration/zookeeper/spring-integration-zookeeper-4.2.xsd=org/springframework/integration/zookeeper/config/xml/spring-integration-zookeeper-4.2.xsd +http\://www.springframework.org/schema/integration/zookeeper/spring-integration-zookeeper.xsd=org/springframework/integration/zookeeper/config/xml/spring-integration-zookeeper-4.2.xsd diff --git a/spring-integration-zookeeper/src/main/resources/META-INF/spring.tooling b/spring-integration-zookeeper/src/main/resources/META-INF/spring.tooling new file mode 100644 index 00000000000..ec806cac9a5 --- /dev/null +++ b/spring-integration-zookeeper/src/main/resources/META-INF/spring.tooling @@ -0,0 +1,4 @@ +# Tooling related information for the integration Zookeeper namespace +http\://www.springframework.org/schema/integration/zookeeper@name=integration Zookeeper Namespace +http\://www.springframework.org/schema/integration/zookeeper@prefix=int-zk +http\://www.springframework.org/schema/integration/zookeeper@icon=org/springframework/integration/config/xml/spring-integration-zookeeper.gif diff --git a/spring-integration-zookeeper/src/main/resources/org/springframework/integration/zookeeper/config/xml/spring-integration-zookeeper-4.2.xsd b/spring-integration-zookeeper/src/main/resources/org/springframework/integration/zookeeper/config/xml/spring-integration-zookeeper-4.2.xsd new file mode 100644 index 00000000000..f15562d1474 --- /dev/null +++ b/spring-integration-zookeeper/src/main/resources/org/springframework/integration/zookeeper/config/xml/spring-integration-zookeeper-4.2.xsd @@ -0,0 +1,56 @@ + + + + + + + + + + + + + + + The definition for the Spring Integration Zookeeper leader listener to automatically start/stop + endpoints based on the Role. + The 'role' attribute here is matched to 'role' attributes on associated 'SmartLifecycle' + beans. + Endpoint roles can be defined in XML configuration or via '@Role' annotations. + + + + + + + + + A reference to a CuratorFramework bean. + + + + + + + + + + + + The path in zookeeper. + + + + + + + diff --git a/spring-integration-zookeeper/src/main/resources/org/springframework/integration/zookeeper/config/xml/spring-integration-zookeeper.gif b/spring-integration-zookeeper/src/main/resources/org/springframework/integration/zookeeper/config/xml/spring-integration-zookeeper.gif new file mode 100644 index 00000000000..210e0764fa4 Binary files /dev/null and b/spring-integration-zookeeper/src/main/resources/org/springframework/integration/zookeeper/config/xml/spring-integration-zookeeper.gif differ diff --git a/spring-integration-zookeeper/src/test/java/org/springframework/integration/zookeeper/ZookeeperTestSupport.java b/spring-integration-zookeeper/src/test/java/org/springframework/integration/zookeeper/ZookeeperTestSupport.java index b7a81dde43b..9a9665aaac9 100644 --- a/spring-integration-zookeeper/src/test/java/org/springframework/integration/zookeeper/ZookeeperTestSupport.java +++ b/spring-integration-zookeeper/src/test/java/org/springframework/integration/zookeeper/ZookeeperTestSupport.java @@ -72,7 +72,7 @@ public void tearDown() throws Exception { CloseableUtils.closeQuietly(this.client); } - protected CuratorFramework createNewClient() throws InterruptedException { + protected static CuratorFramework createNewClient() throws InterruptedException { CuratorFramework client = CuratorFrameworkFactory.newClient(testingServer.getConnectString(), new BoundedExponentialBackoffRetry(100, 1000, 3)); client.start(); diff --git a/spring-integration-zookeeper/src/test/java/org/springframework/integration/zookeeper/config/xml/ZookeeperParserTests-context.xml b/spring-integration-zookeeper/src/test/java/org/springframework/integration/zookeeper/config/xml/ZookeeperParserTests-context.xml new file mode 100644 index 00000000000..27938951d09 --- /dev/null +++ b/spring-integration-zookeeper/src/test/java/org/springframework/integration/zookeeper/config/xml/ZookeeperParserTests-context.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + + diff --git a/spring-integration-zookeeper/src/test/java/org/springframework/integration/zookeeper/config/xml/ZookeeperParserTests.java b/spring-integration-zookeeper/src/test/java/org/springframework/integration/zookeeper/config/xml/ZookeeperParserTests.java new file mode 100644 index 00000000000..5783b71e92b --- /dev/null +++ b/spring-integration-zookeeper/src/test/java/org/springframework/integration/zookeeper/config/xml/ZookeeperParserTests.java @@ -0,0 +1,86 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.zookeeper.config.xml; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import org.apache.curator.framework.CuratorFramework; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.endpoint.SourcePollingChannelAdapter; +import org.springframework.integration.test.util.TestUtils; +import org.springframework.integration.zookeeper.ZookeeperTestSupport; +import org.springframework.integration.zookeeper.leader.LeaderInitiator; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * @author Gary Russell + * @since 4.2 + * + */ +@ContextConfiguration +@RunWith(SpringJUnit4ClassRunner.class) +@DirtiesContext +public class ZookeeperParserTests extends ZookeeperTestSupport { + + @Autowired + private LeaderInitiator initiator; + + @Autowired + private SourcePollingChannelAdapter adapter; + + @Autowired + private CuratorFramework client; + + @Test + public void test() throws Exception { + assertFalse(this.initiator.isAutoStartup()); + assertEquals(1234, this.initiator.getPhase()); + assertEquals("/siNamespaceTest", TestUtils.getPropertyValue(this.initiator, "namespace")); + assertEquals("cluster", TestUtils.getPropertyValue(this.initiator, "candidate.role")); + assertSame(this.client, TestUtils.getPropertyValue(this.initiator, "client")); + + this.initiator.start(); + int n = 0; + while (n++ < 100 && !this.adapter.isRunning()) { + Thread.sleep(100); + } + assertTrue(this.adapter.isRunning()); + this.initiator.stop(); + n = 0; + while (n++ < 100 && this.adapter.isRunning()) { + Thread.sleep(100); + } + assertFalse(this.adapter.isRunning()); + } + + public static class Config { + + @Bean + public CuratorFramework client() throws Exception { + return createNewClient(); + } + } + +} diff --git a/spring-integration-zookeeper/src/test/java/org/springframework/integration/zookeeper/event/ZookeeperLeaderTests.java b/spring-integration-zookeeper/src/test/java/org/springframework/integration/zookeeper/event/ZookeeperLeaderTests.java new file mode 100644 index 00000000000..db1a57fbfd4 --- /dev/null +++ b/spring-integration-zookeeper/src/test/java/org/springframework/integration/zookeeper/event/ZookeeperLeaderTests.java @@ -0,0 +1,142 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.zookeeper.event; + +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.Collections; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.springframework.beans.factory.BeanFactory; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.SmartLifecycle; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.core.MessageSource; +import org.springframework.integration.endpoint.SourcePollingChannelAdapter; +import org.springframework.integration.leader.DefaultCandidate; +import org.springframework.integration.leader.event.AbstractLeaderEvent; +import org.springframework.integration.leader.event.DefaultLeaderEventPublisher; +import org.springframework.integration.leader.event.LeaderEventPublisher; +import org.springframework.integration.leader.event.OnGrantedEvent; +import org.springframework.integration.leader.event.OnRevokedEvent; +import org.springframework.integration.support.SmartLifecycleRoleController; +import org.springframework.integration.zookeeper.ZookeeperTestSupport; +import org.springframework.integration.zookeeper.leader.LeaderInitiator; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.support.PeriodicTrigger; + +/** + * @author Gary Russell + * @since 4.2 + * + */ +public class ZookeeperLeaderTests extends ZookeeperTestSupport { + + private final BlockingQueue events = new LinkedBlockingQueue(); + + private final SourcePollingChannelAdapter adapter = buildChannelAdapter(); + + private final SmartLifecycleRoleController controller = new SmartLifecycleRoleController( + Collections.singletonList("sitest"), Collections.singletonList(this.adapter)); + + @Test + public void testLeader() throws Exception { + LeaderEventPublisher publisher = publisher(); + DefaultCandidate candidate1 = new DefaultCandidate("foo", "sitest"); + LeaderInitiator initiator1 = new LeaderInitiator(this.client, candidate1, "/sitest"); + initiator1.setLeaderEventPublisher(publisher); + initiator1.start(); + DefaultCandidate candidate2 = new DefaultCandidate("bar", "sitest"); + LeaderInitiator initiator2 = new LeaderInitiator(this.client, candidate2, "/sitest"); + initiator2.setLeaderEventPublisher(publisher); + initiator2.start(); + AbstractLeaderEvent event = this.events.poll(10, TimeUnit.SECONDS); + assertNotNull(event); + assertThat(event, instanceOf(OnGrantedEvent.class)); + event.getContext().yield(); + + assertTrue(this.adapter.isRunning()); + + event = this.events.poll(10, TimeUnit.SECONDS); + assertNotNull(event); + assertThat(event, instanceOf(OnRevokedEvent.class)); + + assertFalse(this.adapter.isRunning()); + + event = this.events.poll(10, TimeUnit.SECONDS); + assertNotNull(event); + assertThat(event, instanceOf(OnGrantedEvent.class)); + + assertTrue(this.adapter.isRunning()); + + initiator1.stop(); + initiator2.stop(); + event = this.events.poll(10, TimeUnit.SECONDS); + assertNotNull(event); + assertThat(event, instanceOf(OnRevokedEvent.class)); + + assertFalse(this.adapter.isRunning()); + } + + private LeaderEventPublisher publisher() { + return new DefaultLeaderEventPublisher(new ApplicationEventPublisher() { + + @Override + public void publishEvent(Object event) { + } + + @Override + public void publishEvent(ApplicationEvent event) { + AbstractLeaderEvent leadershipEvent = (AbstractLeaderEvent) event; + controller.onApplicationEvent((AbstractLeaderEvent) event); + events.add(leadershipEvent); + } + + }); + } + + private SourcePollingChannelAdapter buildChannelAdapter() { + SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter(); + adapter.setSource(new MessageSource() { + + @Override + public Message receive() { + return new GenericMessage("foo"); + } + }); + adapter.setOutputChannel(new QueueChannel()); + adapter.setTrigger(new PeriodicTrigger(10000)); + adapter.setBeanFactory(mock(BeanFactory.class)); + ThreadPoolTaskScheduler sched = new ThreadPoolTaskScheduler(); + sched.afterPropertiesSet(); + adapter.setTaskScheduler(sched); + adapter.afterPropertiesSet(); + return adapter; + } + +}