Skip to content

Commit

Permalink
INT-3455 Orderly Shutdown Improvements
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-3455

Do not stop schedulers and executors - allows mid-flow
QueueChannels to be drained.

Stop all inbound MessageProducers (that are not OrderlyShutdownCapable).

INT-3455 Polishing; PR Comments

Change deprecate method usage to the new version
  • Loading branch information
garyrussell authored and artembilan committed Aug 5, 2014
1 parent 5f214ea commit 6b8bdc7
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "default-domain");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "object-name-static-properties");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "managed-components", "componentNamePatterns");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "shutdown-executor");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "object-naming-strategy", "namingStrategy");

builder.addPropertyValue("server", mbeanServer);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -14,17 +14,12 @@
package org.springframework.integration.monitor;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -56,12 +51,13 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.Lifecycle;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.support.ExecutorServiceAdapter;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.history.MessageHistoryConfigurer;
import org.springframework.integration.support.context.NamedComponent;
import org.springframework.jmx.export.MBeanExporter;
Expand All @@ -76,10 +72,7 @@
import org.springframework.jmx.support.MetricType;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.ReflectionUtils;
Expand Down Expand Up @@ -115,7 +108,7 @@
*/
@ManagedResource
public class IntegrationMBeanExporter extends MBeanExporter implements BeanPostProcessor, BeanFactoryAware,
ApplicationContextAware, BeanClassLoaderAware, SmartLifecycle, Runnable {
ApplicationContextAware, BeanClassLoaderAware, SmartLifecycle {

private static final Log logger = LogFactory.getLog(IntegrationMBeanExporter.class);

Expand All @@ -135,6 +128,8 @@ public class IntegrationMBeanExporter extends MBeanExporter implements BeanPostP

private final Set<SimpleMessageSourceMetrics> sources = new HashSet<SimpleMessageSourceMetrics>();

private final Set<Lifecycle> inboundLifecycleMessageProducers = new HashSet<Lifecycle>();

private final Set<DirectChannelMetrics> channels = new HashSet<DirectChannelMetrics>();

private final Map<String, Object> exposedBeans = new HashMap<String, Object>();
Expand Down Expand Up @@ -173,12 +168,8 @@ public class IntegrationMBeanExporter extends MBeanExporter implements BeanPostP

private String[] componentNamePatterns = { "*" };

private volatile Executor shutdownExecutor;

private volatile long shutdownDeadline;

private volatile boolean shutdownForced;

private final AtomicBoolean shuttingDown = new AtomicBoolean();

private MessageHistoryConfigurer messageHistoryConfigurer;
Expand Down Expand Up @@ -236,11 +227,6 @@ public void setApplicationContext(ApplicationContext applicationContext)
this.applicationContext = applicationContext;
}

public void setShutdownExecutor(Executor shutdownExecutor) {
Assert.notNull(shutdownExecutor, "Shutdown Executor may not be null");
this.shutdownExecutor = shutdownExecutor;
}

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

Expand Down Expand Up @@ -300,6 +286,13 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
bean = advised;
}

if (bean instanceof MessageProducer && bean instanceof Lifecycle) {
Lifecycle target = (Lifecycle) extractTarget(bean);
if (!(target instanceof AbstractReplyProducingMessageHandler)) { // TODO: change to AMPMH
this.inboundLifecycleMessageProducers.add(target);
}
}

return bean;

}
Expand Down Expand Up @@ -478,89 +471,66 @@ public void destroy() {
}

/**
* Shutdown active components. If the thread calling this method is
* managed by a Spring-managed executor, you should provide a specific
* dedicated executor via the {@link #setShutdownExecutor}
* method. When this is provided, the shutdown will be performed on one
* of its threads, instead of the calling thread; thus avoiding
* the situation where we will wait for the current thread to terminate.
* <p> It is not necessary to supply this executor service if the
* current thread will not, itself, be shutdown as a result of
* calling this method.
* <p><b>Note:</b> The supplied executor service
* will <b>not</b> be shut down.
* Shutdown active components.
*
* @param force If true, stop the executors with shutdownNow(), canceling
* running tasks. Overrides any settings on schedulers/executors. When true
* may result in error messages being sent to error channels.
* @param force No longer used.
* @param howLong The time to wait in total for all activities to complete
* in milliseconds.
* @deprecated Use {@link #stopActiveComponents(long)}.
*/
@Deprecated
@ManagedOperation
public void stopActiveComponents(boolean force, long howLong) {
stopActiveComponents(howLong);
}

/**
* Shutdown active components.
*
* @param howLong The time to wait in total for all activities to complete
* in milliseconds.
*/
@ManagedOperation
public void stopActiveComponents(long howLong) {
if (!this.shuttingDown.compareAndSet(false, true)) {
logger.error("Shutdown already in process");
return;
}
this.shutdownDeadline = System.currentTimeMillis() + howLong;
this.shutdownForced = force;
if (this.shutdownExecutor == null) {
try {
logger.debug("Running shutdown on current thread");
this.run();
} catch (Exception e) {
logger.error("Orderly shutdown failed", e);
}
try {
logger.debug("Running shutdown");
doShutdown();
}
else {
logger.debug("Launching shutdown on another thread");
this.shutdownExecutor.execute(this);
catch (Exception e) {
logger.error("Orderly shutdown failed", e);
}
}

/**
* Perform orderly shutdown - called or executed from
* {@link #stopActiveComponents(boolean, long)}.
* {@link #stopActiveComponents(long)}.
*/
@Override
public void run() {
private void doShutdown() {
try {
this.orderlyShutdownCapableComponentsBefore();
this.stopActiveChannels();
this.stopSchedulers();
if (System.currentTimeMillis() > this.shutdownDeadline) {
logger.error("Timed out before waiting for all schedulers to complete");
}
this.stopExecutors();
if (System.currentTimeMillis() > this.shutdownDeadline) {
logger.error("Timed out before waiting for all executors to complete");
}
this.stopNonSpringExecutors();
if (System.currentTimeMillis() > this.shutdownDeadline) {
logger.error("Timed out before waiting for all non-Spring executors to complete");
}
this.stopMessageSources();
orderlyShutdownCapableComponentsBefore();
stopActiveChannels();
stopMessageSources();
stopInboundMessageProducers();
// Wait any remaining time for messages to quiesce
long timeLeft = this.shutdownDeadline - System.currentTimeMillis();
long timeLeft = shutdownDeadline - System.currentTimeMillis();
if (timeLeft > 0) {
try {
Thread.sleep(timeLeft);
} catch (InterruptedException e) {
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Interrupted while waiting for quiesce");
}
this.orderlyShutdownCapableComponentsAfter();
}
else {
this.shutdownForced = true;
this.stopSchedulers();
this.stopExecutors();
this.stopNonSpringExecutors();
this.orderlyShutdownCapableComponentsAfter();
}
orderlyShutdownCapableComponentsAfter();
}
finally {
this.shuttingDown.set(false);
shuttingDown.set(false);
}
}

Expand All @@ -585,6 +555,22 @@ public void stopMessageSources() {
}
}

/**
* Stops all inbound message producers (that are not {@link OrderlyShutdownCapable})
* - may cause interrupts.
*/
@ManagedOperation
public void stopInboundMessageProducers() {
for (Lifecycle producer : this.inboundLifecycleMessageProducers) {
if (!(producer instanceof OrderlyShutdownCapable)) {
if (logger.isInfoEnabled()) {
logger.info("Stopping message producer " + producer);
}
producer.stop();
}
}
}

@ManagedOperation
public void stopActiveChannels() {
// Stop any "active" channels (JMS etc).
Expand All @@ -600,80 +586,6 @@ public void stopActiveChannels() {
}
}

@ManagedOperation
public void stopSchedulers() {
if (logger.isDebugEnabled()) {
logger.debug("Stopping schedulers " + (this.shutdownForced ? "(force)" : ""));
}
List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
Map<String, ThreadPoolTaskScheduler> schedulers = this.applicationContext
.getBeansOfType(ThreadPoolTaskScheduler.class);
for (Entry<String, ThreadPoolTaskScheduler> entry : schedulers.entrySet()) {
ThreadPoolTaskScheduler scheduler = entry.getValue();
if (logger.isInfoEnabled()) {
logger.info("Stopping scheduler " + scheduler.getThreadNamePrefix());
}
ExecutorService executorService = scheduler.getScheduledExecutor();
executorServices.add(executorService);
doShutdownExecutorService(executorService);
}
waitForExecutors(executorServices);
logger.debug("Stopped schedulers");
}

@ManagedOperation
public void stopExecutors() {
if (logger.isDebugEnabled()) {
logger.debug("Stopping executors" + (this.shutdownForced ? "(force)" : ""));
}
List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
Map<String, ThreadPoolTaskExecutor> executors = this.applicationContext
.getBeansOfType(ThreadPoolTaskExecutor.class);
for (Entry<String, ThreadPoolTaskExecutor> entry : executors.entrySet()) {
ThreadPoolTaskExecutor executor = entry.getValue();
if (executor == this.shutdownExecutor) {
logger.debug("Skipping shutdown of shutdown executor");
}
else {
if (logger.isInfoEnabled()) {
logger.info("Stopping executor " + executor.getThreadNamePrefix());
}
ExecutorService executorService = executor.getThreadPoolExecutor();
executorServices.add(executorService);
doShutdownExecutorService(executorService);
}
}
waitForExecutors(executorServices);
logger.debug("Stopped executors");
}

@ManagedOperation
public void stopNonSpringExecutors() {
if (logger.isDebugEnabled()) {
logger.debug("Stopping other executors" + (this.shutdownForced ? "(force)" : ""));
}
List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
Map<String, ExecutorService> nonSpringExecutors = this.applicationContext
.getBeansOfType(ExecutorService.class);
for (Entry<String, ExecutorService> entry : nonSpringExecutors.entrySet()) {
ExecutorService executorService = entry.getValue();
if (!(executorService instanceof ExecutorServiceAdapter)) {
if (logger.isInfoEnabled()) {
logger.info("Stopping executor service " + executorService);
}
executorServices.add(executorService);
doShutdownExecutorService(executorService);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Ignoring ExecutorServiceAdapter");
}
}
}
waitForExecutors(executorServices);
logger.debug("Stopped other executors");
}

protected final void orderlyShutdownCapableComponentsBefore() {
logger.debug("Initiating stop OrderlyShutdownCapable components");
Map<String, OrderlyShutdownCapable> components = this.applicationContext
Expand Down Expand Up @@ -793,33 +705,6 @@ protected void registerBeans() {
}
}

private void doShutdownExecutorService(ExecutorService executorService) {
if (this.shutdownForced) {
executorService.shutdownNow();
}
else {
executorService.shutdown();
}
}

private void waitForExecutors(List<ExecutorService> executorServices) {
for (ExecutorService executorService : executorServices) {
try {
if (!executorService.awaitTermination(this.shutdownDeadline
- System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
logger.error("Executor service " + executorService + " failed to terminate");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Interrupted while shutting down executor service " + executorService);
throw new MessagingException("Interrupted while shutting down", e);
}
if (System.currentTimeMillis() > this.shutdownDeadline) {
logger.error("Timed out before waiting for all executor services");
}
}
}

private void registerChannels() {
for (DirectChannelMetrics monitor : channels) {
String name = monitor.getName();
Expand Down Expand Up @@ -1231,4 +1116,5 @@ private static Object getField(Object target, String name) {
ReflectionUtils.makeAccessible(field);
return ReflectionUtils.getField(field, target);
}

}
Loading

0 comments on commit 6b8bdc7

Please sign in to comment.