Skip to content

Commit

Permalink
INT-3442 RedisQMDE Delay stop Until Last Read
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-3442

Previously `RedisQueueMessageDrivenEndpoint.stop()` returned immediately.
It caused an issue when one more `message` might be read and processed to stopped app.

* Introduce `AbstractEndpoint#lifecycleCondition` and wait on it from `RedisQueueMessageDrivenEndpoint.doStop()`
and `signal()` it from `ListenerTask`.
* Since everything is done around `lifecycleLock` the `RedisQueueMessageDrivenEndpoint.stop()` waits for the proper
'last' message process.
* Add tiny `Thread.sleep(1)` to the `popMessageAndSend` cycle to free `lifecycleLock` for other Threads, e.g. `stop()`

INT-3442: Get rid of `lock` from listener cycle

* Introduce `stopTimeout` to minimize the `stop` thread blocking
* If the message is returned after that timeout it moved back to Redis List using `RPUSH`

INT-3442: Addressing PR comments
  • Loading branch information
Artem Bilan authored and garyrussell committed Jun 20, 2014
1 parent f4bd034 commit 4b1eed2
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.integration.endpoint;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.context.SmartLifecycle;
Expand All @@ -24,14 +25,14 @@

/**
* The base class for Message Endpoint implementations.
*
*
* <p>This class implements Lifecycle and provides an {@link #autoStartup}
* property. If <code>true</code>, the endpoint will start automatically upon
* initialization. Otherwise, it will require an explicit invocation of its
* {@link #start()} method. The default value is <code>true</code>.
* To require explicit startup, provide a value of <code>false</code>
* to the {@link #setAutoStartup(boolean)} method.
*
*
* @author Mark Fisher
*/
public abstract class AbstractEndpoint extends IntegrationObjectSupport implements SmartLifecycle {
Expand All @@ -42,7 +43,9 @@ public abstract class AbstractEndpoint extends IntegrationObjectSupport implemen

private volatile boolean running;

private final ReentrantLock lifecycleLock = new ReentrantLock();
protected final ReentrantLock lifecycleLock = new ReentrantLock();

protected final Condition lifecycleCondition = this.lifecycleLock.newCondition();


public void setAutoStartup(boolean autoStartup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class RedisQueueMessageDrivenEndpoint extends MessageProducerSupport impl

private volatile long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;

private volatile long stopTimeout = DEFAULT_RECEIVE_TIMEOUT;

private volatile boolean active;

private volatile boolean listening;
Expand Down Expand Up @@ -104,7 +106,6 @@ public void setSerializer(RedisSerializer<?> serializer) {
* the retrieved data will be used as the payload for a new Spring Integration
* Message. Otherwise, the data is deserialized as Spring Integration
* Message.
*
* @param expectMessage Defaults to false
*/
public void setExpectMessage(boolean expectMessage) {
Expand All @@ -114,23 +115,28 @@ public void setExpectMessage(boolean expectMessage) {
/**
* This timeout (milliseconds) is used when retrieving elements from the queue
* specified by {@link #boundListOperations}.
* <p>
* If the queue does contain elements, the data is retrieved immediately. However,
* <p> If the queue does contain elements, the data is retrieved immediately. However,
* if the queue is empty, the Redis connection is blocked until either an element
* can be retrieved from the queue or until the specified timeout passes.
* <p>
* A timeout of zero can be used to block indefinitely. If not set explicitly
* <p> A timeout of zero can be used to block indefinitely. If not set explicitly
* the timeout value will default to {@code 1000}
* <p>
* See also: http://redis.io/commands/brpop
*
* <p> See also: http://redis.io/commands/brpop
* @param receiveTimeout Must be non-negative. Specified in milliseconds.
*/
public void setReceiveTimeout(long receiveTimeout) {
Assert.isTrue(receiveTimeout > 0, "'receiveTimeout' must be > 0.");
this.receiveTimeout = receiveTimeout;
}

/**
* @param stopTimeout the timeout to block {@link #doStop()} until the last message will be processed
* or this timeout is reached. Should be less then or equal to {@link #receiveTimeout}
* @since 4.0.3
*/
public void setStopTimeout(long stopTimeout) {
this.stopTimeout = stopTimeout;
}

public void setTaskExecutor(Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}
Expand All @@ -153,7 +159,8 @@ protected void onInit() {
}
if (this.taskExecutor == null) {
String beanName = this.getComponentName();
this.taskExecutor = new SimpleAsyncTaskExecutor((beanName == null ? "" : beanName + "-") + this.getComponentType());
this.taskExecutor = new SimpleAsyncTaskExecutor((beanName == null ? "" : beanName + "-")
+ this.getComponentType());
}
if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && this.getBeanFactory() != null) {
MessagePublishingErrorHandler errorHandler =
Expand All @@ -179,7 +186,8 @@ private void popMessageAndSend() {
catch (Exception e) {
this.listening = false;
if (this.active) {
logger.error("Failed to execute listening task. Will attempt to resubmit in " + this.recoveryInterval + " milliseconds.", e);
logger.error("Failed to execute listening task. Will attempt to resubmit in " + this.recoveryInterval
+ " milliseconds.", e);
this.publishException(e);
this.sleepBeforeRecoveryAttempt();
}
Expand Down Expand Up @@ -208,7 +216,12 @@ private void popMessageAndSend() {
}

if (message != null) {
this.sendMessage(message);
if (this.listening) {
this.sendMessage(message);
}
else {
this.boundListOperations.rightPush(value);
}
}
}

Expand All @@ -231,6 +244,7 @@ private void sleepBeforeRecoveryAttempt() {
}
catch (InterruptedException e) {
logger.debug("Thread interrupted while sleeping the recovery interval");
Thread.currentThread().interrupt();
}
}
}
Expand All @@ -252,7 +266,17 @@ private void restart() {

@Override
protected void doStop() {
this.active = false;
try {
this.active = false;
this.lifecycleCondition.await(Math.min(this.stopTimeout, this.receiveTimeout), TimeUnit.MICROSECONDS);
}
catch (InterruptedException e) {
logger.debug("Thread interrupted while stopping the endpoint");
Thread.currentThread().interrupt();
}
finally {
this.listening = false;
}
}

public boolean isListening() {
Expand Down Expand Up @@ -284,9 +308,9 @@ private class ListenerTask implements Runnable {

@Override
public void run() {
RedisQueueMessageDrivenEndpoint.this.listening = true;
try {
while (RedisQueueMessageDrivenEndpoint.this.active) {
RedisQueueMessageDrivenEndpoint.this.listening = true;
RedisQueueMessageDrivenEndpoint.this.popMessageAndSend();
}
}
Expand All @@ -295,7 +319,13 @@ public void run() {
RedisQueueMessageDrivenEndpoint.this.restart();
}
else {
RedisQueueMessageDrivenEndpoint.this.listening = false;
RedisQueueMessageDrivenEndpoint.this.lifecycleLock.lock();
try {
RedisQueueMessageDrivenEndpoint.this.lifecycleCondition.signalAll();
}
finally {
RedisQueueMessageDrivenEndpoint.this.lifecycleLock.unlock();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.hamcrest.Matchers;
Expand All @@ -36,6 +38,7 @@
import org.junit.runner.RunWith;
import org.mockito.Mockito;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -45,16 +48,19 @@
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.event.IntegrationEvent;
import org.springframework.integration.redis.event.RedisExceptionEvent;
import org.springframework.integration.redis.rules.RedisAvailable;
import org.springframework.integration.redis.rules.RedisAvailableTests;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
Expand Down Expand Up @@ -208,6 +214,46 @@ public void testInt3017IntegrationSymmetrical() throws Exception {
assertEquals(payload, receive.getPayload());
}

@Test
@RedisAvailable
@SuppressWarnings("unchecked")
public void testInt3442ProperlyStop() throws Exception {
final String queueName = "si.test.testInt3442ProperlyStopTest";

final RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(this.connectionFactory);
redisTemplate.setEnableDefaultSerializer(false);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
redisTemplate.afterPropertiesSet();

RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint(queueName,
this.connectionFactory);
BoundListOperations<String, byte[]> boundListOperations =
TestUtils.getPropertyValue(endpoint, "boundListOperations", BoundListOperations.class);
boundListOperations = Mockito.spy(boundListOperations);
new DirectFieldAccessor(endpoint).setPropertyValue("boundListOperations", boundListOperations);
endpoint.setBeanFactory(Mockito.mock(BeanFactory.class));
endpoint.setOutputChannel(new DirectChannel());
endpoint.setReceiveTimeout(1000);
endpoint.setStopTimeout(100);

ExecutorService executorService = Executors.newCachedThreadPool();
endpoint.setTaskExecutor(executorService);

endpoint.afterPropertiesSet();
endpoint.start();

redisTemplate.boundListOps(queueName).leftPush("foo");
endpoint.stop();

executorService.shutdown();
assertTrue(executorService.awaitTermination(1, TimeUnit.SECONDS));

Mockito.verify(boundListOperations).rightPush(Mockito.any(byte[].class));
}


@Test
@RedisAvailable
@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 4b1eed2

Please sign in to comment.