Skip to content

Instantly share code, notes, and snippets.

@rocketraman
Created April 1, 2012 17:16
Show Gist options
  • Save rocketraman/2277155 to your computer and use it in GitHub Desktop.
Save rocketraman/2277155 to your computer and use it in GitHub Desktop.
package com.vivosys.cluster.hazelcast;
import com.vivosys.cluster.api.Cluster;
import com.vivosys.cluster.api.ClusterException;
import com.google.common.collect.ImmutableList;
import com.hazelcast.core.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
/**
* An implementation of Cluster using Hazelcast.
*/
public class HazelcastCluster implements Cluster {
private static final Logger log = LoggerFactory.getLogger(HazelcastCluster.class);
private HazelcastInstance hazelcastInstance;
private IMap<Object, Member> lockMap;
public void init() {
lockMap = hazelcastInstance.getMap("lockMap");
hazelcastInstance.getCluster().addMembershipListener(new MembershipListener() {
@Override
public void memberAdded(MembershipEvent membershipEvent) {
log.info("{} is now part of the cluster.", membershipEvent.getMember());
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
final Member member = membershipEvent.getMember();
log.info("{} is no longer part of the cluster.", member);
try {
clearTokenLocks(member);
} catch (Exception e) {
/* ignore */
}
}
});
}
public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
this.hazelcastInstance = hazelcastInstance;
}
public LifecycleService getLifecycleService() {
return hazelcastInstance.getLifecycleService();
}
@Override
public Lock getThreadLock(Object object) {
return hazelcastInstance.getLock(object);
}
@Override
public boolean tryTokenLock(final Object object) {
return safeMapOp(new Callable<Boolean>() {
@Override
public Boolean call() {
Member value = lockMap.get(object);
if (value == null) {
lockMap.put(object, thisMember());
return true;
}
return false;
}
}, lockMap);
}
@Override
public void tokenUnlock(final Object object) {
safeMapOp(new Callable<Void>() {
@Override
public Void call() {
lockMap.remove(object);
return null;
}
}, lockMap);
}
@Override
public void clearAllTokenLocks() {
safeMapOp(new Callable<Void>() {
@Override
public Void call() {
for (Iterator keyIterator = lockMap.keySet().iterator(); keyIterator.hasNext(); keyIterator.next()) {
keyIterator.remove();
}
return null;
}
}, lockMap);
}
@Override
public ImmutableList<Object> getThisMemberTokenLocks() {
return safeMapOp(new Callable<ImmutableList<Object>>() {
@Override
public ImmutableList<Object> call() {
final ImmutableList.Builder<Object> lockListBuilder = new ImmutableList.Builder<>();
for (final Object lockedObject : lockMap.keySet()) {
if (lockMap.get(lockedObject).equals(thisMember())) {
lockListBuilder.add(lockedObject);
}
}
return lockListBuilder.build();
}
}, lockMap);
}
@Override
public void clearThisMemberTokenLocks() {
clearTokenLocks(thisMember());
}
private void clearTokenLocks(final Member member) {
safeMapOp(new Callable<Void>() {
@Override
public Void call() {
for (Iterator keyIterator = lockMap.keySet().iterator(); keyIterator.hasNext(); ) {
final Member lockMember = lockMap.get(keyIterator.next());
if (lockMember.equals(member)) {
keyIterator.remove();
}
}
return null;
}
}, lockMap);
}
private <T> T safeMapOp(Callable<T> callable, IMap<Object, Member> iMap) {
try {
if(! iMap.lockMap(10, TimeUnit.SECONDS)) {
throw new ClusterException("Unable to lock distributed map " + iMap.getName() +
" in cluster " + thisMember() + ".");
}
} catch (Exception e) {
log.warn("Exception while locking distributed map, perhaps a node was shutting down?", e);
}
try {
try {
return callable.call();
} catch (Exception e) {
throw new ClusterException(e);
}
} finally {
try {
iMap.unlockMap();
} catch (Exception e) {
log.warn("Exception while unlocking distributed map, perhaps a node was shutting down?", e);
}
}
}
private Member thisMember() {
return hazelcastInstance.getCluster().getLocalMember();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment