Created
May 31, 2013 19:55
-
-
Save jentfoo/5687563 to your computer and use it in GitHub Desktop.
Wrapper for hazelcast lock to work around issue #267
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.Lock; | |
import com.hazelcast.core.ILock; | |
public class ILockWrapper implements Lock { | |
private static final Map<String, Object> LOCK_THREAD_LOCKS = new HashMap<String, Object>(); | |
private final ILock replicatedLock; | |
private final ThreadLockTracker lockTracker; | |
public ILockWrapper(ILock replicatedLock, | |
String lockName) { // must provide a lock name that matches the replicatedLock name so that this is safe for locks created multiple times within the same vm | |
this.replicatedLock = replicatedLock; | |
synchronized (LOCK_THREAD_LOCKS) { | |
Object lock = LOCK_THREAD_LOCKS.get(lockName); | |
if (lock == null) { | |
lock = new Object(); | |
LOCK_THREAD_LOCKS.put(lockName, lock); | |
} | |
lockTracker = new ThreadLockTracker(lock); | |
} | |
} | |
protected void destroy() { | |
replicatedLock.destroy(); | |
lockTracker.reset(); | |
} | |
@Override | |
public void lock() { | |
replicatedLock.lock(); | |
verifyAndSetLockedThread(); | |
} | |
@Override | |
public void lockInterruptibly() throws InterruptedException { | |
replicatedLock.lockInterruptibly(); | |
verifyAndSetLockedThread(); | |
} | |
@Override | |
public Condition newCondition() { | |
throw new UnsupportedOperationException("Not currently implemented for distributed locks"); | |
} | |
@Override | |
public boolean tryLock() { | |
if (replicatedLock.tryLock()) { | |
verifyAndSetLockedThread(); | |
return true; | |
} else { | |
return false; | |
} | |
} | |
@Override | |
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { | |
if (replicatedLock.tryLock(time, unit)) { | |
verifyAndSetLockedThread(); | |
return true; | |
} else { | |
return false; | |
} | |
} | |
@Override | |
public void unlock() { | |
synchronized (lockTracker.threadLock) { // must hold lock before unlock occurs, unlike other actions | |
try { | |
replicatedLock.unlock(); | |
lockTracker.trackUnlock(); | |
} catch (IllegalMonitorStateException e) { | |
if (e.getMessage().contains("owner") && | |
lockTracker.isLockedThread()) { | |
// swallow exception, hazelcast defect: https://github.com/hazelcast/hazelcast/issues/267 | |
lockTracker.trackUnlock(); | |
} else { | |
throw e; | |
} | |
} | |
} | |
} | |
public void forceUnlock() { | |
synchronized (lockTracker.threadLock) { // must hold lock before unlock occurs, unlike other actions | |
replicatedLock.forceUnlock(); | |
lockTracker.reset(); // TODO - should we reset or just trackUnlock | |
} | |
} | |
public boolean hasLock() { | |
return lockTracker.isLockedThread(); | |
} | |
private void verifyAndSetLockedThread() { | |
try { | |
lockTracker.trackLock(); | |
} catch (RuntimeException e) { | |
// unlock in case there was an error | |
replicatedLock.unlock(); | |
throw e; | |
} | |
} | |
private class ThreadLockTracker { | |
public final Object threadLock; | |
private int lockCount; | |
private Thread lockedThread; | |
public ThreadLockTracker(Object threadLock) { | |
this.threadLock = threadLock; | |
lockCount = 0; | |
lockedThread = null; | |
} | |
public void trackLock() { | |
synchronized (threadLock) { | |
if (lockCount == 0) { | |
lockedThread = Thread.currentThread(); | |
lockCount = 1; | |
} else { | |
lockCount++; | |
} | |
} | |
} | |
public void trackUnlock() { | |
synchronized (threadLock) { | |
lockCount--; | |
if (lockCount == 0) { | |
lockedThread = null; | |
} | |
} | |
} | |
public boolean isLockedThread() { | |
synchronized (threadLock) { | |
return Thread.currentThread() == lockedThread; | |
} | |
} | |
public void reset() { | |
synchronized (threadLock) { | |
lockCount = 0; | |
lockedThread = null; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment