Skip to content

Commit

Permalink
INT-3617: ZK Leader Event Processing
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-3617

Start/stop `SmartLifecyle` beans with leader election/revocation.

ZK Namespace Support

Add Annotation Support

INT-3617: Polishing

Add `SmartLifecycle` to listener parser.

INT-3617: Polishing; PR Comments

Revert SmartLifecycleRoleController

Remove reflection.

Remove Dependence on spring-cloud-cluster

Temporarily move the relevant classes here.

Polishing; PR Comments and Fix Test

Test was incorrectly stopping the LeaderInitiator before it was elected.
Set auto-startup="false" in the i-c-a so we wait for its start before
stopping the LeaderInitiator.

Fix JavaDocs
  • Loading branch information
garyrussell authored and artembilan committed Jul 6, 2015
1 parent 9cc0652 commit 814698f
Show file tree
Hide file tree
Showing 42 changed files with 1,829 additions and 12 deletions.
7 changes: 4 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,14 @@ subprojects { subproject ->
smack3Version = '3.2.1'
smackVersion = '4.0.6'
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '1.5.0.M1'
// springCloudClusterVersion = '1.0.0.BUILD-SNAPSHOT'
springDataMongoVersion = '1.7.0.RELEASE'
springDataRedisVersion = '1.5.0.RELEASE'
springGemfireVersion = '1.6.0.RELEASE'
springSecurityVersion = project.hasProperty('springSecurityVersion') ? project.springSecurityVersion : '4.0.1.RELEASE'
springSocialTwitterVersion = '1.1.0.RELEASE'
springRetryVersion = '1.1.2.RELEASE'
springVersion = project.hasProperty('springVersion') ? project.springVersion : '4.2.0.BUILD-SNAPSHOT'
springVersion = project.hasProperty('springVersion') ? project.springVersion : '4.2.0.RC2'
springWsVersion = '2.2.1.RELEASE'
xmlUnitVersion = '1.5'
xstreamVersion = '1.4.7'
Expand Down Expand Up @@ -268,6 +269,7 @@ project('spring-integration-core') {
compile "org.springframework:spring-messaging:$springVersion"
compile "org.springframework:spring-tx:$springVersion"
compile "org.springframework.retry:spring-retry:$springRetryVersion"
// compile ("org.springframework.cloud:spring-cloud-cluster-core:$springCloudClusterVersion", optional)
compile ("io.projectreactor:reactor-stream:$reactorVersion", optional)
compile("com.fasterxml.jackson.core:jackson-databind:$jackson2Version", optional)
compile("com.jayway.jsonpath:json-path:$jsonpathVersion", optional)
Expand Down Expand Up @@ -687,14 +689,13 @@ project('spring-integration-zookeeper') {
description = 'Spring Integration Zookeeper Support'
dependencies {
compile project(":spring-integration-core")

compile ("org.apache.zookeeper:zookeeper:$zookeeperVersion") {
exclude group: 'io.netty', module: 'netty'
exclude group: 'junit', module: 'junit'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'log4j', module: 'log4j'
}

// compile "org.springframework.cloud:spring-cloud-cluster-zookeeper:$springCloudClusterVersion"
compile("org.apache.curator:curator-recipes:$curatorVersion") {
exclude group: 'org.apache.zookeeper', module: 'zookeeper'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotate endpoints to assign them to a role. Such endpoints can be started/stopped as
* a group. See {@code SmartLifecycleRoleController}.
*
* @author Gary Russell
* @since 4.2
*/
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Role {

/**
* @return the role for this endpoint. See {@code SmartLifecycleRoleController}.
*/
String value() default "";

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.context.Lifecycle;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.support.SmartLifecycleRoleController;

/**
* Shared utility methods for Integration configuration.
Expand Down Expand Up @@ -51,6 +55,17 @@ public static void autoCreateDirectChannel(String channelName, BeanDefinitionReg
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}

public static void registerRoleControllerDefinitionIfNecessary(BeanDefinitionRegistry registry) {
if (!registry.containsBeanDefinition(
IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER)) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SmartLifecycleRoleController.class);
builder.addConstructorArgValue(new ManagedList<String>());
builder.addConstructorArgValue(new ManagedList<Lifecycle>());
registry.registerBeanDefinition(
IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER, builder.getBeanDefinition());
}
}

private IntegrationConfigUtils() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.List;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.config.BeanDefinition;
Expand All @@ -49,9 +52,6 @@
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.util.ClassUtils;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* {@link ImportBeanDefinitionRegistrar} implementation that configures integration infrastructure.
*
Expand Down Expand Up @@ -102,6 +102,7 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B
this.registerMessagingAnnotationPostProcessors(importingClassMetadata, registry);
}
this.registerMessageBuilderFactory(registry);
IntegrationConfigUtils.registerRoleControllerDefinitionIfNecessary(registry);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.commons.logging.Log;
Expand All @@ -35,6 +36,8 @@
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationEvent;
Expand All @@ -49,15 +52,20 @@
import org.springframework.integration.annotation.BridgeTo;
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Role;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Splitter;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.support.SmartLifecycleRoleController;
import org.springframework.integration.util.MessagingAnnotationUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

Expand All @@ -71,7 +79,8 @@
* @author Gary Russell
*/
public class MessagingAnnotationPostProcessor implements BeanPostProcessor, BeanFactoryAware,
InitializingBean, Lifecycle, ApplicationListener<ApplicationEvent>, EnvironmentAware {
InitializingBean, Lifecycle, ApplicationListener<ApplicationEvent>, EnvironmentAware,
SmartInitializingSingleton {

private final Log logger = LogFactory.getLog(this.getClass());

Expand All @@ -88,6 +97,7 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Bean

private volatile boolean running = true;

private final MultiValueMap<String, String> lazyLifecyleRoles = new LinkedMultiValueMap<String, String>();

@Override
public void setBeanFactory(BeanFactory beanFactory) {
Expand Down Expand Up @@ -120,6 +130,21 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro
return bean;
}

@Override
public void afterSingletonsInstantiated() {
SmartLifecycleRoleController roleController;
try {
roleController = beanFactory.getBean(IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER,
SmartLifecycleRoleController.class);
for (Entry<String, List<String>> entry : this.lazyLifecyleRoles.entrySet()) {
roleController.addLifecyclesToRole(entry.getKey(), entry.getValue());
}
}
catch (NoSuchBeanDefinitionException e) {
logger.error("No lifecyle role controller in context");
}
}

@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Assert.notNull(this.beanFactory, "BeanFactory must not be null");
Expand Down Expand Up @@ -200,6 +225,10 @@ public void doWith(Method method) throws IllegalArgumentException, IllegalAccess
if (result instanceof ApplicationListener) {
listeners.add((ApplicationListener) result);
}
Role role = AnnotationUtils.findAnnotation(method, Role.class);
if (role != null) {
lazyLifecyleRoles.add(role.value(), endpointBeanName);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ protected final AbstractBeanDefinition parseInternal(Element element, ParserCont
if (StringUtils.hasText(phase)) {
propertyValues.add("phase", new TypedStringValue(phase));
}
String role = element.getAttribute(IntegrationNamespaceUtils.ROLE);
if (StringUtils.hasText(role)) {
if (!StringUtils.hasText(element.getAttribute(ID_ATTRIBUTE))) {
parserContext.getReaderContext().error("When using 'role', 'id' is required", element);
}
IntegrationNamespaceUtils.putLifecycleInRole(role, element.getAttribute(ID_ATTRIBUTE), parserContext);
}
return beanDefinition;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ protected final AbstractBeanDefinition parseInternal(Element element, ParserCont
}
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, IntegrationNamespaceUtils.AUTO_STARTUP);
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, IntegrationNamespaceUtils.PHASE);
String role = element.getAttribute(IntegrationNamespaceUtils.ROLE);
if (StringUtils.hasText(role)) {
if (!StringUtils.hasText(element.getAttribute(ID_ATTRIBUTE))) {
parserContext.getReaderContext().error("When using 'role', 'id' is required", element);
}
IntegrationNamespaceUtils.putLifecycleInRole(role, element.getAttribute(ID_ATTRIBUTE), parserContext);
}
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
String beanName = this.resolveId(element, beanDefinition, parserContext);
parserContext.registerBeanComponent(new BeanComponentDefinition(beanDefinition, beanName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.config.BeanReference;
import org.springframework.beans.factory.config.ConstructorArgumentValues;
import org.springframework.beans.factory.config.ConstructorArgumentValues.ValueHolder;
import org.springframework.beans.factory.config.RuntimeBeanReference;
Expand Down Expand Up @@ -72,6 +73,7 @@ public abstract class IntegrationNamespaceUtils {
public static final String REQUEST_HANDLER_ADVICE_CHAIN = "request-handler-advice-chain";
public static final String AUTO_STARTUP = "auto-startup";
public static final String PHASE = "phase";
public static final String ROLE = "role";

/**
* Configures the provided bean definition builder with a property value corresponding to the attribute whose name
Expand Down Expand Up @@ -547,4 +549,19 @@ public static void checkAndConfigureFixedSubscriberChannel(Element element, Pars
}
}

public static void putLifecycleInRole(String role, String beanName, ParserContext parserContext) {
BeanDefinitionRegistry registry = parserContext.getRegistry();
IntegrationConfigUtils.registerRoleControllerDefinitionIfNecessary(registry);
BeanDefinition controllerDef = registry.getBeanDefinition(
IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER);
@SuppressWarnings("unchecked")
ManagedList<String> roles = (ManagedList<String>) controllerDef.getConstructorArgumentValues()
.getArgumentValue(0, ManagedList.class).getValue();
@SuppressWarnings("unchecked")
ManagedList<BeanReference> lifecycles = (ManagedList<BeanReference>) controllerDef.getConstructorArgumentValues()
.getArgumentValue(1, ManagedList.class).getValue();
roles.add(role);
lifecycles.add(new RuntimeBeanReference(beanName));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public abstract class IntegrationContextUtils {
public static final String TO_STRING_FRIENDLY_JSON_NODE_TO_STRING_CONVERTER_BEAN_NAME =
"toStringFriendlyJsonNodeToStringConverter";

public static final String INTEGRATION_LIFECYCLE_ROLE_CONTROLLER = "integrationLifecycleRoleController";

/**
* @param beanFactory BeanFactory for lookup, must not be null.
* @return The {@link MetadataStore} bean whose name is "metadataStore".
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.leader;

import java.util.UUID;

import org.springframework.util.StringUtils;

/**
* Base implementation of a {@link Candidate}.
*
* @author Janne Valkealahti
*
*/
public abstract class AbstractCandidate implements Candidate {

private static final String DEFAULT_ROLE = "leader";

private final String id;

private final String role;

/**
* Instantiate a abstract candidate.
*/
public AbstractCandidate() {
this(null, null);
}

/**
* Instantiate a abstract candidate.
*
* @param id the identifier
* @param role the role
*/
public AbstractCandidate(String id, String role) {
this.id = StringUtils.hasText(id) ? id : UUID.randomUUID().toString();
this.role = StringUtils.hasText(role) ? role : DEFAULT_ROLE;
}

@Override
public String getRole() {
return this.role;
}

@Override
public String getId() {
return this.id;
}

@Override
public abstract void onGranted(Context ctx) throws InterruptedException;

@Override
public abstract void onRevoked(Context ctx);

}
Loading

0 comments on commit 814698f

Please sign in to comment.