Skip to content

Instantly share code, notes, and snippets.

@markathomas
Created April 27, 2015 18:36
Show Gist options
  • Save markathomas/7c152528cff2ae46af27 to your computer and use it in GitHub Desktop.
Save markathomas/7c152528cff2ae46af27 to your computer and use it in GitHub Desktop.
Test case for bad cluster query results
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MapIndexConfig;
import com.hazelcast.config.InterfacesConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.config.SerializerConfig;
import com.hazelcast.config.TcpIpConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.LifecycleListener;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
import com.hazelcast.core.PartitionAwareKey;
import com.hazelcast.instance.GroupProperties;
import com.hazelcast.instance.Node;
import com.hazelcast.instance.TestUtil;
import com.hazelcast.map.merge.LatestUpdateMapMergePolicy;
import com.hazelcast.query.Predicates;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.TestEnvironment;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import org.testng.annotations.Test;
public class BadClusterStateTest extends HazelcastTestSupport {
@Test(timeOut = 1000 * 60 * 10)
public void testBadState() throws CloneNotSupportedException {
String mapName = randomMapName();
Config config = newConfig(LatestUpdateMapMergePolicy.class.getName(), mapName);
HazelcastInstance h1 = Hazelcast.newHazelcastInstance(config);
HazelcastInstance h2 = Hazelcast.newHazelcastInstance(config);
TestMembershipListener membershipListener = new TestMembershipListener(1);
h2.getCluster().addMembershipListener(membershipListener);
TestLifecycleListener lifecycleListener = new TestLifecycleListener(1);
h2.getLifecycleService().addLifecycleListener(lifecycleListener);
RealtimeCall call = new RealtimeCall();
call.setId(UUID.randomUUID());
call.setClusterUUID(UUID.randomUUID());
call.setDisplayId(1);
call.setNumber("5554447777");
IMap<PartitionAwareKey<UUID, UUID>, RealtimeCall> map1 = h1.getMap(mapName);
IMap<PartitionAwareKey<UUID, UUID>, RealtimeCall> map2 = h2.getMap(mapName);
map1.put(call.getAffinityKey(), call);
sleepMillis(1);
assert map2.get(call.getAffinityKey()) != null;
closeConnectionBetween(h1, h2);
assertOpenEventually(membershipListener.latch);
assertClusterSizeEventually(1, h1);
assertClusterSizeEventually(1, h2);
map1 = h1.getMap(mapName);
map1.remove(call.getAffinityKey());
sleepMillis(1);
map2 = h2.getMap(mapName);
assert map2.get(call.getAffinityKey()) != null;
assertOpenEventually(lifecycleListener.latch);
assertClusterSizeEventually(2, h1);
assertClusterSizeEventually(2, h2);
map1 = h1.getMap(mapName);
assert map1.get(call.getAffinityKey()) != null;
map1.remove(call.getAffinityKey());
assert map2.get(call.getAffinityKey()) == null;
for (int i = 0; i < 100; i++) {
Collection<RealtimeCall> calls = map1.values(Predicates.equal("id", call.getId()));
System.out.println("Map 1 query by uuid: " + calls.size());
calls = map2.values(Predicates.equal("id", call.getId()));
System.out.println("Map 2 query by uuid: " + calls.size());
calls = map1.values(Predicates.equal("displayId", call.getDisplayId()));
System.out.println("Map 1 query by display id: " + calls.size());
calls = map2.values(Predicates.equal("displayId", call.getDisplayId()));
System.out.println("Map 2 query by displayId: " + calls.size());
RealtimeCall c = map1.get(call.getAffinityKey());
System.out.println("Map 1 get by affinity key: " + (c == null ? "null" : "not null"));
c = map2.get(call.getAffinityKey());
System.out.println("Map 2 get by affinity key: " + (c == null ? "null" : "not null"));
sleepMillis(5);
}
}
private Config newConfig(String mergePolicy, String mapName) {
Config config = new Config();
config.setProperties(this.getCommonProperties());
config.setProperty(GroupProperties.PROP_MERGE_FIRST_RUN_DELAY_SECONDS, "5");
config.setProperty(GroupProperties.PROP_MERGE_NEXT_RUN_DELAY_SECONDS, "3");
MapConfig mapConfig = config.getMapConfig(mapName);
mapConfig.setMergePolicy(mergePolicy);
mapConfig.setBackupCount(1);
mapConfig.setReadBackupData(true);
mapConfig.setStatisticsEnabled(true);
mapConfig.setMaxIdleSeconds(0);
mapConfig.setTimeToLiveSeconds(0);
mapConfig.addMapIndexConfig(new MapIndexConfig("id", false));
mapConfig.addMapIndexConfig(new MapIndexConfig("number", false));
mapConfig.addMapIndexConfig(new MapIndexConfig("createdOn", true));
config.setNetworkConfig(this.getLocalhostTcpIpNetworkConfig(6701));
config.getGroupConfig().setName(mapName);
config.getGroupConfig().setPassword(mapName);
return config;
}
private void closeConnectionBetween(HazelcastInstance h1, HazelcastInstance h2) {
if (h1 == null || h2 == null) return;
final Node n1 = TestUtil.getNode(h1);
final Node n2 = TestUtil.getNode(h2);
n1.clusterService.removeAddress(n2.address);
n2.clusterService.removeAddress(n1.address);
}
private class TestLifecycleListener implements LifecycleListener {
CountDownLatch latch;
TestLifecycleListener(int countdown) {
latch = new CountDownLatch(countdown);
}
@Override
public void stateChanged(LifecycleEvent event) {
if (event.getState() == LifecycleEvent.LifecycleState.MERGED) {
latch.countDown();
}
}
}
private class TestMembershipListener implements MembershipListener {
final CountDownLatch latch;
TestMembershipListener(int countdown) {
latch = new CountDownLatch(countdown);
}
@Override
public void memberAdded(MembershipEvent membershipEvent) {
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
latch.countDown();
}
@Override
public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
}
}
public static void main(String[] args) throws CloneNotSupportedException {
new BadClusterStateTest().testBadState();
}
protected NetworkConfig getLocalhostTcpIpNetworkConfig(int port) {
NetworkConfig networkConfig = new NetworkConfig();
networkConfig.setPort(port);
networkConfig.getJoin().getMulticastConfig().setEnabled(false);
TcpIpConfig tcpIpConfig = networkConfig.getJoin().getTcpIpConfig();
tcpIpConfig.setEnabled(true);
tcpIpConfig.addMember("127.0.0.1");
InterfacesConfig interfacesConfig = networkConfig.getInterfaces();
interfacesConfig.setEnabled(true);
interfacesConfig.setInterfaces(Collections.singleton("127.0.0.*"));
return networkConfig;
}
protected Properties getCommonProperties() {
Properties properties = new Properties();
properties.setProperty(GroupProperties.PROP_LOGGING_TYPE, "slf4j");
properties.setProperty(GroupProperties.PROP_VERSION_CHECK_ENABLED, "false");
properties.setProperty("hazelcast.mancenter.enabled", "false");
properties.setProperty(GroupProperties.PROP_WAIT_SECONDS_BEFORE_JOIN, "1");
properties.setProperty(GroupProperties.PROP_CONNECT_ALL_WAIT_SECONDS, "5");
properties.setProperty(GroupProperties.PROP_MAX_NO_HEARTBEAT_SECONDS, "2");
properties.setProperty(GroupProperties.PROP_HEARTBEAT_INTERVAL_SECONDS, "1");
properties.setProperty(GroupProperties.PROP_MASTER_CONFIRMATION_INTERVAL_SECONDS, "5");
properties.setProperty(GroupProperties.PROP_MAX_NO_MASTER_CONFIRMATION_SECONDS, "10");
properties.setProperty(GroupProperties.PROP_MEMBER_LIST_PUBLISH_INTERVAL_SECONDS, "5");
properties.setProperty(GroupProperties.PROP_MAX_JOIN_MERGE_TARGET_SECONDS, "10");
properties.setProperty("hazelcast.local.localAddress", "127.0.0.1");
properties.setProperty("java.net.preferIPv4Stack", "true");
properties.setProperty(TestEnvironment.HAZELCAST_TEST_USE_NETWORK, "false");
// randomize multicast group...
Random rand = new Random();
int g1 = rand.nextInt(255);
int g2 = rand.nextInt(255);
int g3 = rand.nextInt(255);
properties.setProperty("hazelcast.multicast.group", "224." + g1 + "." + g2 + "." + g3);
return properties;
}
}
import com.hazelcast.core.PartitionAwareKey;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.DataSerializable;
import java.io.IOException;
import java.util.Date;
import java.util.UUID;
public class RealtimeCall implements DataSerializable {
private UUID id;
private UUID clusterUUID;
private Date createdOn = new Date();
private String number;
private long displayId;
public UUID getId() {
return this.id;
}
public void setId(UUID id) {
this.id = id;
}
public UUID getClusterUUID() {
return this.clusterUUID;
}
public void setClusterUUID(UUID clusterUUID) {
this.clusterUUID = clusterUUID;
}
public Date getCreatedOn() {
return this.createdOn;
}
public void setCreatedOn(Date createdOn) {
this.createdOn = createdOn;
}
public String getNumber() {
return this.number;
}
public void setNumber(String number) {
this.number = number;
}
public long getDisplayId() {
return this.displayId;
}
public void setDisplayId(long displayId) {
this.displayId = displayId;
}
public PartitionAwareKey<UUID, UUID> getAffinityKey() {
return new PartitionAwareKey<>(getId(), getClusterUUID());
}
public void writeData(ObjectDataOutput out) throws IOException {
out.writeObject(this.id);
out.writeObject(this.clusterUUID);
out.writeObject(this.createdOn);
out.writeUTF(this.number);
out.writeLong(this.displayId);
}
public void readData(ObjectDataInput in) throws IOException {
this.id = in.readObject();
this.clusterUUID = in.readObject();
this.createdOn = in.readObject();
this.number = in.readUTF();
this.displayId = in.readLong();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment