Skip to content

Commit

Permalink
INT-3667: Fix RedisLockRegistry memory leak
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-3667

* fix `RedisLockRegistry.tryLock` memory leaks using 2 different thread local internal storages:
 hard references for locked locks and weak references (optional) for others, weak references are used for lock obtaining optimization -
thread will get same `RedisLock` object for certain key before locking and after unlocking (if variable still exists)
* add `RedisLockRegistry.useWeakReferences` property for enable thread local weak references storage for unlocked locks, disabled by default
* fix `RedisLockRegistry$RedisLock.obtainLock` improper expire time update (expire time was updated on every attempt to get lock)
* update `RedisLockRegistry` tests
  • Loading branch information
kiakimov authored and artembilan committed Mar 30, 2015
1 parent f686c21 commit efef65c
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.redis.util;

import java.net.InetAddress;
Expand All @@ -22,11 +23,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -93,14 +95,18 @@ public final class RedisLockRegistry implements LockRegistry {

private final RedisTemplate<String, RedisLock> redisTemplate;

private final ThreadLocal<List<RedisLock>> threadLocks = new ThreadLocal<List<RedisLock>>();
private final ThreadLocal<Set<RedisLock>> weakThreadLocks = new ThreadLocal<Set<RedisLock>>();

private final ThreadLocal<List<RedisLock>> hardThreadLocks = new ThreadLocal<List<RedisLock>>();

private final long expireAfter;

private final LockRegistry localRegistry;

private final LockSerializer lockSerializer = new LockSerializer();

private boolean useWeakReferences = false;

static {
String host;
try {
Expand Down Expand Up @@ -156,48 +162,120 @@ public RedisLockRegistry(RedisConnectionFactory connectionFactory, String regist
this.localRegistry = localRegistry;
}

@Override
public Lock obtain(Object lockKey) {
Assert.isInstanceOf(String.class, lockKey);
List<RedisLock> locks = this.threadLocks.get();
/**
* Change the state of thread local weak references storage for unlocked locks.
* Thread local weak references are used for lock obtaining optimization -
* thread will get same {@link RedisLock} object for certain key before actual
* locking and after unlocking (if variable still exists).
* <p>While is switched off (by default) every {@link RedisLockRegistry#obtain} call will provide
* different {@link RedisLock} objects for same unlocked key.
* @param useWeakReferences set to true for switch thread local weak references storage on, false by default
* @since 4.0.7
*/
public void setUseWeakReferences(boolean useWeakReferences) {
this.useWeakReferences = useWeakReferences;
}

/**
* Weak referenced locks, lock is kept here when actual lock is NOT gained.
* Used for obtaining same lock object within same thread and key.
* To avoid memory leaks lock objects without actual lock are kept as weak references.
* After gaining the actual lock, lock object moves from weak reference to hard reference and vise a versa.
*/
private Collection<RedisLock> getWeakThreadLocks() {
Set<RedisLock> locks = this.weakThreadLocks.get();
if (locks == null) {
locks = Collections.newSetFromMap(new WeakHashMap<RedisLock, Boolean>());
this.weakThreadLocks.set(locks);
}
return locks;
}

/**
* Hard referenced locks, lock is kept here when actual lock is gained.
*/
private Collection<RedisLock> getHardThreadLocks() {
List<RedisLock> locks = this.hardThreadLocks.get();
if (locks == null) {
locks = new LinkedList<RedisLock>();
this.threadLocks.set(locks);
this.hardThreadLocks.set(locks);
}
RedisLock lock = null;
for (RedisLock alock : locks) {
if (alock.getLockKey().equals(lockKey)) {
lock = alock;
break;
return locks;
}

private RedisLock findLock(Collection<RedisLock> locks, Object key) {
if (locks != null) {
for (RedisLock lock : locks) {
if (lock.getLockKey().equals(key)) {
return lock;
}
}
}
return null;
}

private void toHardThreadStorage(RedisLock lock) {
if (this.weakThreadLocks.get() != null) {
this.weakThreadLocks.get().remove(lock);
}

getHardThreadLocks().add(lock);

//clean up
if (this.weakThreadLocks.get() != null && this.weakThreadLocks.get().isEmpty()) {
this.weakThreadLocks.remove();
}
}

private void toWeakThreadStorage(RedisLock lock) {
//to avoid collection creation on existence check use direct fields
if (this.hardThreadLocks.get() != null) {
getHardThreadLocks().remove(lock);
}

if (this.useWeakReferences) {
getWeakThreadLocks().add(lock);
}

//clean up
if (this.hardThreadLocks.get() != null && this.hardThreadLocks.get().isEmpty()) {
this.hardThreadLocks.remove();
}
}

@Override
public Lock obtain(Object lockKey) {
Assert.isInstanceOf(String.class, lockKey);

//try to find the lock within hard references
RedisLock lock = findLock(this.hardThreadLocks.get(), lockKey);

/*
* If the lock is locked, check that it matches what's in the store.
* If it doesn't, the lock must have expired.
*/
if (lock != null && lock.thread != null) {
RedisLock lockInStore = RedisLockRegistry.this.redisTemplate
.boundValueOps(this.registryKey + ":" + lockKey).get();
RedisLock lockInStore = this.redisTemplate.boundValueOps(this.registryKey + ":" + lockKey).get();
if (lockInStore == null || !lock.equals(lockInStore)) {
removeLockFromThreadLocal(locks, lock);
getHardThreadLocks().remove(lock);
lock = null;
}
}

if (lock == null) {
lock = new RedisLock((String) lockKey);
locks.add(lock);
}
return lock;
}
//try to find the lock within weak references
lock = findLock(this.weakThreadLocks.get(), lockKey);

private void removeLockFromThreadLocal(List<RedisLock> locks, RedisLock lock) {
Iterator<RedisLock> iterator = locks.iterator();
while (iterator.hasNext()) {
if (iterator.next().equals(lock)) {
iterator.remove();
break;
if (lock == null) {
lock = new RedisLock((String) lockKey);

if (this.useWeakReferences) {
getWeakThreadLocks().add(lock);
}
}
}

return lock;
}

public Collection<Lock> listLocks() {
Expand Down Expand Up @@ -312,48 +390,59 @@ private boolean obtainLock() {
this.reLock++;
return true;
}

toHardThreadStorage(this);

/*
* Set these now so they will be persisted if successful.
*/
this.lockedAt = System.currentTimeMillis();
this.threadName = currentThread.getName();

Boolean success = RedisLockRegistry.this.redisTemplate.execute(new SessionCallback<Boolean>() {
Boolean success = false;
try {
success = RedisLockRegistry.this.redisTemplate.execute(new SessionCallback<Boolean>() {

@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Boolean execute(RedisOperations ops) throws DataAccessException {
String key = constructLockKey();
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Boolean execute(RedisOperations ops) throws DataAccessException {
String key = constructLockKey();

ops.watch(key); //monitor key
ops.watch(key); //monitor key

ops.multi(); //transaction start
if (ops.opsForValue().get(key) != null) {
ops.unwatch(); //key already exists, stop monitoring
return false;
}

//can't rely on operations result inside transaction, execution is delayed till `exec()`
ops.opsForValue().setIfAbsent(key, RedisLock.this);
ops.multi(); //transaction start

//set expire on key if exists
ops.expire(key, RedisLockRegistry.this.expireAfter, TimeUnit.MILLISECONDS);
//set the value and expire
ops.opsForValue()
.set(key, RedisLock.this, RedisLockRegistry.this.expireAfter, TimeUnit.MILLISECONDS);

//exec will contain all operations result or null - if execution has been aborted due to 'watch'
List result = ops.exec();
//exec will contain all operations result or null - if execution has been aborted due to 'watch'
return ops.exec() != null;
}

//check 'setIfAbsent' result (first in list)
return (result != null) && (!result.isEmpty()) && (Boolean.TRUE.equals(result.get(0)));
}
});

});
} finally {

if (!success) {
this.lockedAt = 0;
this.threadName = null;
}
else {
this.thread = currentThread;
if (logger.isDebugEnabled()) {
logger.debug("New lock; " + this.toString());
if (!success) {
this.lockedAt = 0;
this.threadName = null;
toWeakThreadStorage(this);
}
else {
this.thread = currentThread;
if (logger.isDebugEnabled()) {
logger.debug("New lock; " + this.toString());
}
}

}

return success;
}

Expand Down Expand Up @@ -388,22 +477,20 @@ public void unlock() {
}
throw new IllegalStateException("Lock is owned by " + this.thread.getName() + "; " + this.toString());
}

try {
if (this.reLock-- <= 0) {
List<RedisLock> locks = RedisLockRegistry.this.threadLocks.get();
if (locks != null) {
removeLockFromThreadLocal(locks, this);
if (locks.size() == 0) { // last lock for this thread
RedisLockRegistry.this.threadLocks.remove();
try {
this.assertLockInRedisIsUnchanged();
RedisLockRegistry.this.redisTemplate.delete(constructLockKey());
if (logger.isDebugEnabled()) {
logger.debug("Released lock; " + this.toString());
}
} finally {
this.thread = null;
this.reLock = 0;
toWeakThreadStorage(this);
}
this.assertLockInRedisIsUnchanged();
RedisLockRegistry.this.redisTemplate.delete(constructLockKey());
if (logger.isDebugEnabled()) {
logger.debug("Released lock; " + this.toString());
}
this.thread = null;
this.reLock = 0;
}
}
finally {
Expand Down Expand Up @@ -501,8 +588,8 @@ public byte[] serialize(RedisLock t) throws SerializationException {
int keyLength = t.lockKey.length();
int threadNameLength = t.threadName.length();
byte[] value = new byte[1 + hostLength +
1 + keyLength +
1 + threadNameLength + 8];
1 + keyLength +
1 + threadNameLength + 8];
ByteBuffer buff = ByteBuffer.wrap(value);
buff.put((byte) hostLength)
.put(t.lockHost)
Expand Down
Loading

0 comments on commit efef65c

Please sign in to comment.