Skip to content

Commit

Permalink
Fix RedisInboundChannelAdapterTests Race Condition
Browse files Browse the repository at this point in the history
Instead of waiting for a hard 1 second, intelligently wait
until the container is subscribed by sending/receiving a test message.

Also reduces the test run time from 20s to 200ms locally.
  • Loading branch information
garyrussell committed Jun 1, 2015
1 parent 0fdc630 commit bc8b946
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2007-2014 the original author or authors
* Copyright 2007-2015 the original author or authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,11 @@

package org.springframework.integration.redis.inbound;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

import org.hamcrest.Matchers;
import org.junit.Test;
Expand All @@ -36,6 +39,7 @@
/**
* @author Mark Fisher
* @author Artem Bilan
* @author Gary Russell
* @since 2.1
*/
public class RedisInboundChannelAdapterTests extends RedisAvailableTests {
Expand All @@ -62,11 +66,12 @@ private void testRedisInboundChannelAdapterGuts(int iteration) throws Exception
adapter.afterPropertiesSet();
adapter.start();

this.awaitContainerSubscribed(TestUtils.getPropertyValue(adapter, "container",
RedisMessageListenerContainer.class));

StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory);
redisTemplate.afterPropertiesSet();

awaitFullySubscribed(TestUtils.getPropertyValue(adapter, "container", RedisMessageListenerContainer.class),
redisTemplate, redisChannelName, channel, "foo");

for (int i = 0; i < numToTest; i++) {
String message = "test-" + i + " iteration " + iteration;
redisTemplate.convertAndSend(redisChannelName, message);
Expand All @@ -91,14 +96,14 @@ private void testRedisInboundChannelAdapterGuts(int iteration) throws Exception
adapter.afterPropertiesSet();
adapter.start();

this.awaitContainerSubscribed(TestUtils.getPropertyValue(adapter, "container",
RedisMessageListenerContainer.class));

RedisTemplate<?, ?> template = new RedisTemplate<Object, Object>();
template.setConnectionFactory(connectionFactory);
template.setEnableDefaultSerializer(false);
template.afterPropertiesSet();

awaitFullySubscribed(TestUtils.getPropertyValue(adapter, "container", RedisMessageListenerContainer.class),
template, redisChannelName, channel, "foo".getBytes());

for (int i = 0; i < numToTest; i++) {
String message = "test-" + i + " iteration " + iteration;
template.convertAndSend(redisChannelName, message.getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.BoundZSetOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;

/**
* @author Oleg Zhurakousky
Expand All @@ -52,6 +55,12 @@ protected RedisConnectionFactory getConnectionFactoryForTest() {
}

protected void awaitContainerSubscribed(RedisMessageListenerContainer container) throws Exception {
awaitContainerSubscribedNoWait(container);
// wait another second because of race condition
Thread.sleep(1000);
}

private void awaitContainerSubscribedNoWait(RedisMessageListenerContainer container) throws InterruptedException {
RedisConnection connection = null;

int n = 0;
Expand All @@ -67,8 +76,6 @@ protected void awaitContainerSubscribed(RedisMessageListenerContainer container)
Thread.sleep(100);
}
assertTrue("RedisMessageListenerContainer Failed to Subscribe", n < 100);
// wait another second because of race condition
Thread.sleep(1000);
}

protected void awaitContainerSubscribedWithPatterns(RedisMessageListenerContainer container) throws Exception {
Expand All @@ -85,6 +92,26 @@ protected void awaitContainerSubscribedWithPatterns(RedisMessageListenerContaine
Thread.sleep(1000);
}

protected void awaitFullySubscribed(RedisMessageListenerContainer container, RedisTemplate<?, ?> redisTemplate,
String redisChannelName, QueueChannel channel, Object message) throws Exception {
awaitContainerSubscribedNoWait(container);
drain(channel);
long now = System.currentTimeMillis();
Message<?> received = null;
while (received == null && System.currentTimeMillis() - now < 10000) {
redisTemplate.convertAndSend(redisChannelName, message);
received = channel.receive(1000);
}
drain(channel);
assertNotNull("Container failed to fully start", received);
}

private void drain(QueueChannel channel) {
while (channel.receive(0) != null) {
;
}
}

protected void prepareList(RedisConnectionFactory connectionFactory){

StringRedisTemplate redisTemplate = createStringRedisTemplate(connectionFactory);
Expand Down

0 comments on commit bc8b946

Please sign in to comment.