Skip to content

Commit

Permalink
Merge pull request #835 from mulesoft/MULE-7728
Browse files Browse the repository at this point in the history
MULE-7728: Fixed several clean up problems related to partitioned object...
  • Loading branch information
vromero committed Jul 14, 2014
2 parents baab92f + 9a3ce4d commit abe0d74
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 34 deletions.
26 changes: 22 additions & 4 deletions core/src/main/java/org/mule/util/store/MuleObjectStoreManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.Serializable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -38,6 +39,7 @@ public class MuleObjectStoreManager
protected ScheduledThreadPoolExecutor scheduler;
MuleContext muleContext;
ConcurrentMap<String, ObjectStore<?>> stores = new ConcurrentHashMap<String, ObjectStore<?>>();
private final ConcurrentMap<String, ScheduledFuture<?>> monitors = new ConcurrentHashMap<>();
private String baseTransientStoreKey = MuleProperties.OBJECT_STORE_DEFAULT_IN_MEMORY_NAME;
private String basePersistentStoreKey = MuleProperties.OBJECT_STORE_DEFAULT_PERSISTENT_NAME;
private String baseTransientUserStoreKey = MuleProperties.DEFAULT_USER_TRANSIENT_OBJECT_STORE_NAME;
Expand Down Expand Up @@ -205,9 +207,9 @@ private <T extends ObjectStore<? extends Serializable>> T getMonitorablePartitio
T previous = (T) stores.putIfAbsent(name, store);
if (previous == null)
{
Monitor m = new Monitor(name, (PartitionableExpirableObjectStore) baseStore, entryTTL,
maxEntries);
scheduler.scheduleWithFixedDelay(m, 0, expirationInterval, TimeUnit.MILLISECONDS);
Monitor m = new Monitor(name, (PartitionableExpirableObjectStore) baseStore, entryTTL, maxEntries);
ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(m, 0, expirationInterval, TimeUnit.MILLISECONDS);
monitors.put(name, future);
return store;
}
else
Expand Down Expand Up @@ -282,7 +284,15 @@ public void disposeStore(ObjectStore<? extends Serializable> store) throws Objec
if (store instanceof ObjectStorePartition)
{
ObjectStorePartition partition = (ObjectStorePartition) store;
partition.getBaseStore().disposePartition(partition.getPartitionName());
String partitionName = partition.getPartitionName();
partition.getBaseStore().disposePartition(partitionName);

ScheduledFuture<?> future = monitors.remove(partitionName);
if(future!=null)
{
future.cancel(false);
}
stores.remove(partitionName);
}
else
{
Expand All @@ -295,6 +305,14 @@ public void disposeStore(ObjectStore<? extends Serializable> store) throws Objec
logger.warn(String.format("ObjectStore of class %s does not support clearing",
store.getClass().getCanonicalName()), e);
}
try
{
stores.values().remove(store);
}
catch(Exception e)
{
logger.warn("Can not remove object store" + store.toString(), e);
}
}

if (store instanceof Disposable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,17 @@ private void trimToMaxSize(ConcurrentSkipListMap<Long, Serializable> store,
@Override
public void disposePartition(String partitionName) throws ObjectStoreException
{
partitions.remove(partitionName);
expiryInfoPartition.remove(partitionName);
removeAndClear(partitions, partitionName);
removeAndClear(expiryInfoPartition, partitionName);
}

private void removeAndClear(Map<String, ? extends Map> map, String key)
{
Map partition = map.remove(key);
if(partition!=null)
{
partition.clear();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ public void expire(int entryTTL, int maxEntries) throws ObjectStoreException
@Override
public void disposePartition(String partitionName) throws ObjectStoreException
{
File partitionFolder = FileUtils.newFile(storeDirectory, partitionName);
FileUtils.deleteQuietly(partitionFolder);
clear(partitionName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URL;

import junit.framework.Assert;
Expand All @@ -52,29 +53,38 @@ public class DefaultMuleContextTestCase extends AbstractMuleTestCase
@Test
public void testClearExceptionHelperCacheForAppWhenDispose() throws Exception
{
URL baseUrl = DefaultMuleContextTestCase.class.getClassLoader().getResource(".");
File file = new File(baseUrl.getFile() + SpiUtils.SERVICE_ROOT + ServiceType.EXCEPTION.getPath()+ "/" + TEST_PROTOCOL + "-exception-mappings.properties");
createExceptionMappingFile(file, INITIAL_VALUE);

MuleContext ctx = new DefaultMuleContextFactory().createMuleContext();
String value = ExceptionHelper.getErrorMapping(TEST_PROTOCOL, IllegalArgumentException.class, ctx);
assertThat(value,is(INITIAL_VALUE));
ctx.dispose();
URL url = DefaultMuleContextTestCase.class.getClassLoader().getResource(SpiUtils.SERVICE_ROOT + ServiceType.EXCEPTION.getPath()+ "/" + TEST_PROTOCOL + "-exception-mappings.properties");
File exceptionMappingFile = new File(url.getFile());

createExceptionMappingFile(file, VALUE_AFTER_REDEPLOY);

ctx = new DefaultMuleContextFactory().createMuleContext();
ctx.setExecutionClassLoader(getClass().getClassLoader());
value = ExceptionHelper.getErrorMapping(TEST_PROTOCOL, IllegalArgumentException.class, ctx);
assertThat(value, is(VALUE_AFTER_REDEPLOY));
}

private void createExceptionMappingFile(File exceptionMappingFile, String value) throws IOException
{
FileWriter fileWriter = null;
try
try
{
fileWriter = new FileWriter(exceptionMappingFile);
fileWriter.append("\njava.lang.IllegalArgumentException=" + VALUE_AFTER_REDEPLOY);
fileWriter.append("\njava.lang.IllegalArgumentException=" + value);
}
finally
finally
{
if (fileWriter != null)
{
fileWriter.close();
}
}
ctx = new DefaultMuleContextFactory().createMuleContext();
ctx.setExecutionClassLoader(getClass().getClassLoader());
value = ExceptionHelper.getErrorMapping(TEST_PROTOCOL, IllegalArgumentException.class, ctx);
assertThat(value, is(VALUE_AFTER_REDEPLOY));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,59 @@

package org.mule.util.store;

import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.matchers.JUnitMatchers.hasItem;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
import org.mule.api.MuleContext;
import org.mule.api.config.MuleConfiguration;
import org.mule.api.config.MuleProperties;
import org.mule.api.lifecycle.Disposable;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.api.registry.MuleRegistry;
import org.mule.api.store.ObjectStore;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.store.ObjectStoreManager;
import org.mule.api.store.PartitionableObjectStore;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;
import org.mule.tck.size.SmallTest;

import java.io.Serializable;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

@SmallTest
public class MuleObjectStoreManagerTestCase extends AbstractMuleTestCase
{

public static final String TEST_PARTITION_NAME = "partition";
private MuleObjectStoreManager storeManager = new MuleObjectStoreManager();

@Rule
public TemporaryFolder tempWorkDir = new TemporaryFolder();

@Test
public void disposeDisposableStore() throws ObjectStoreException
{
@SuppressWarnings("unchecked")
ObjectStore<Serializable> store = Mockito.mock(ObjectStore.class, Mockito.withSettings()
.extraInterfaces(Disposable.class));
ObjectStore<Serializable> store = mock(ObjectStore.class, withSettings()
.extraInterfaces(Disposable.class));

this.storeManager.disposeStore(store);

Mockito.verify(store).clear();
Mockito.verify((Disposable) store).dispose();
verify(store).clear();
verify((Disposable) store).dispose();
}

@Test
Expand All @@ -43,33 +68,160 @@ public void disposePartitionableStore() throws ObjectStoreException
String partitionName = "partition";

@SuppressWarnings("unchecked")
ObjectStorePartition<Serializable> store = Mockito.mock(ObjectStorePartition.class,
Mockito.withSettings()
.extraInterfaces(Disposable.class)
.defaultAnswer(Mockito.RETURNS_DEEP_STUBS));
ObjectStorePartition<Serializable> store = mock(ObjectStorePartition.class,
withSettings()
.extraInterfaces(Disposable.class)
.defaultAnswer(Mockito.RETURNS_DEEP_STUBS));

Mockito.when(store.getPartitionName()).thenReturn(partitionName);
when(store.getPartitionName()).thenReturn(partitionName);

storeManager.disposeStore(store);

Mockito.verify(store.getBaseStore()).disposePartition(partitionName);
Mockito.verify(store, Mockito.never()).clear();
Mockito.verify((Disposable) store).dispose();
verify(store.getBaseStore()).disposePartition(partitionName);
verify(store, never()).clear();
verify((Disposable) store).dispose();
}

@Test
public void ensureTransientPartitionIsCleared() throws ObjectStoreException, InitialisationException
{
ensurePartitionIsCleared(false);
}

@Test
public void ensurePersistentPartitionIsCleared() throws ObjectStoreException, InitialisationException
{
ensurePartitionIsCleared(true);
}

private void ensurePartitionIsCleared(boolean isPersistent) throws ObjectStoreException, InitialisationException
{
try
{
ObjectStorePartition<Serializable> store = createStorePartition(TEST_PARTITION_NAME, isPersistent);

store.getBaseStore().store("Some Key", "Some Value", TEST_PARTITION_NAME);

assertThat(store.allKeys().size(), is(1));

storeManager.disposeStore(store);

assertThat(store.allKeys().size(), is(0));
}
finally
{
storeManager.dispose();
}
}

@Test
public void removeStoreAndMonitorOnTransientPartition() throws ObjectStoreException, InitialisationException
{
removeStoreAndMonitor(false);
}

@Test
public void removeStoreAndMonitorOnPersistentPartition() throws ObjectStoreException, InitialisationException
{
removeStoreAndMonitor(true);
}

private void removeStoreAndMonitor(boolean isPersistent) throws ObjectStoreException, InitialisationException
{
try
{
ObjectStorePartition<Serializable> store = createStorePartition(TEST_PARTITION_NAME, isPersistent);

assertThat(storeManager.scheduler.getActiveCount(), is(1));

storeManager.disposeStore(store);

assertThat(storeManager.stores.keySet(), not(hasItem(TEST_PARTITION_NAME)));

new PollingProber(1000, 60).check(new Probe()
{
@Override
public boolean isSatisfied()
{
return storeManager.scheduler.getActiveCount() == 0;
}

@Override
public String describeFailure()
{
return "There are active scheduler tasks.";
}
});
}
finally
{
storeManager.dispose();
}
}

private ObjectStorePartition<Serializable> createStorePartition(String partitionName, boolean isPersistent) throws InitialisationException
{
MuleContext muleContext = mock(MuleContext.class);

createRegistryAndBaseStore(muleContext, isPersistent);

storeManager.setMuleContext(muleContext);
storeManager.initialise();

ObjectStorePartition<Serializable> store = storeManager
.getObjectStore(partitionName, isPersistent, ObjectStoreManager.UNBOUNDED, 10000, 50);

assertThat(storeManager.stores.keySet(), hasItem(partitionName));

return store;
}

private void createRegistryAndBaseStore(MuleContext muleContext, boolean isPersistent)
{
MuleRegistry muleRegistry = mock(MuleRegistry.class);
if (isPersistent)
{
PartitionableObjectStore<?> store = createPersistentPartitionableObjectStore(muleContext);
when(muleRegistry.lookupObject(MuleProperties.OBJECT_STORE_DEFAULT_PERSISTENT_NAME))
.thenReturn(store);
}
else
{
PartitionableObjectStore<?> store = createTransientPartitionableObjectStore();
when(muleRegistry.lookupObject(MuleProperties.OBJECT_STORE_DEFAULT_IN_MEMORY_NAME))
.thenReturn(store);
}

when(muleContext.getRegistry()).thenReturn(muleRegistry);
}

private PartitionableObjectStore<?> createTransientPartitionableObjectStore()
{
return new PartitionedInMemoryObjectStore<>();
}

private PartitionableObjectStore<?> createPersistentPartitionableObjectStore(MuleContext muleContext)
{
MuleConfiguration muleConfiguration = mock(MuleConfiguration.class);
when(muleConfiguration.getWorkingDirectory()).thenReturn(tempWorkDir.getRoot().getAbsolutePath());
when(muleContext.getConfiguration()).thenReturn(muleConfiguration);

return new PartitionedPersistentObjectStore<>(muleContext);
}

@Test
public void dontFailIfUnsupported() throws ObjectStoreException
{
@SuppressWarnings("unchecked")
ObjectStore<Serializable> store = Mockito.mock(ObjectStore.class, Mockito.withSettings()
.extraInterfaces(Disposable.class));
ObjectStore<Serializable> store = mock(ObjectStore.class, withSettings()
.extraInterfaces(Disposable.class));

Mockito.doThrow(UnsupportedOperationException.class).when(store).clear();
doThrow(UnsupportedOperationException.class).when(store).clear();

storeManager.disposeStore(store);

Mockito.verify(store).clear();
Mockito.verify((Disposable) store).dispose();
verify(store).clear();
verify((Disposable) store).dispose();
}

}

0 comments on commit abe0d74

Please sign in to comment.