Skip to content

Instantly share code, notes, and snippets.

@rodolfodpk
Created June 26, 2014 06:37
Show Gist options
  • Save rodolfodpk/7a7ce63ff4f119d153aa to your computer and use it in GitHub Desktop.
Save rodolfodpk/7a7ce63ff4f119d153aa to your computer and use it in GitHub Desktop.
Hz QueueStore receiving duplicates keys for distinct messages
package com.woo
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.config.ClientNetworkConfig;
import com.hazelcast.config.*;
import com.hazelcast.core.*;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.Serializable;
import java.util.*;
public class ClientQueueDuplicateKeys {
HazelcastInstance hz1;
HazelcastInstance hz2;
HazelcastInstance hzClient1;
// HazelcastInstance hzClient2; error when using 2 clients
final List<String> nodes = nodes() ;
final MyQueueStore queueStore = new MyQueueStore();
final String queueName = "queue4test";
final Config config = config();
@Before
public void init() {
hz1 = Hazelcast.newHazelcastInstance(config);
hz2 = Hazelcast.newHazelcastInstance(config);
hzClient1 = HazelcastClient.newHazelcastClient(clientConfig());
// hzClient2 = HazelcastClient.newHazelcastClient();
}
public List<String> nodes() {
List<String> nodes = new ArrayList<String>();
nodes.add("localhost:5701");
nodes.add("localhost:5702");
return nodes;
}
public Config config() {
Config config = new Config();
NetworkConfig network = config.getNetworkConfig();
JoinConfig join = network.getJoin();
join.getMulticastConfig().setEnabled(false);
join.getTcpIpConfig().setMembers(nodes).setEnabled(true).setRequiredMember(null);
config.addQueueConfig(queueConfig());
return config;
}
public QueueConfig queueConfig() {
QueueStoreConfig queueStoreConfig = new QueueStoreConfig();
queueStoreConfig.setEnabled(true);
queueStoreConfig.setProperty("binary", "false");
queueStoreConfig.setProperty("memory-limit", "0");
queueStoreConfig.setProperty("bulk-load", "100");
queueStoreConfig.setStoreImplementation(queueStore);
QueueConfig config = new QueueConfig();
config.setQueueStoreConfig(queueStoreConfig);
config.setName(queueName);
config.setBackupCount(2);
return config;
}
public ClientConfig clientConfig() {
ClientConfig clientConfig = new ClientConfig();
ClientNetworkConfig networkConfig = new ClientNetworkConfig();
networkConfig.setAddresses(nodes);
clientConfig.setNetworkConfig(networkConfig);
return clientConfig;
}
@Test
public void hzNodesShouldShareIdsForQueue() {
IQueue<String> queue = hzClient1.getQueue(queueName);
queue.add("item0");
hz1.getLifecycleService().terminate();
Assert.assertEquals(queue.size(), 1);
Assert.assertEquals(queueStore.list.get(0), new Pair(0L, "item0"));
queue.add("item1");
Assert.assertEquals(queue.size(), 2);
Assert.assertEquals(queueStore.list.get(1), new Pair(1L, "item1"));
queue.add("item2");
Assert.assertEquals(queue.size(), 3);
Assert.assertEquals(queueStore.list.get(2), new Pair(2L, "item2"));
}
static class MyQueueStore implements QueueStore<String>, Serializable {
public final List<Pair> list = new LinkedList<Pair>();
public void store(Long key, String value) {
list.add(new Pair(key, value));
}
public void storeAll(Map map) {
}
public void delete(Long key) {
// dows not matter
}
public void deleteAll(Collection keys) {
//
}
public String load(Long key) {
return null;
}
public Map<Long, String> loadAll(Collection keys) {
return new HashMap<Long, String>();
}
public Set<Long> loadAllKeys() {
return new HashSet<Long>();
}
}
static class Pair implements Serializable {
public Pair(Long id, String message) {
this.id = id;
this.message = message;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Pair pair = (Pair) o;
if (!id.equals(pair.id)) return false;
if (!message.equals(pair.message)) return false;
return true;
}
@Override
public int hashCode() {
int result = id.hashCode();
result = 31 * result + message.hashCode();
return result;
}
Long id;
String message;
}
}
@rodolfodpk
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment