Skip to content

Instantly share code, notes, and snippets.

Created September 30, 2011 08:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/1253041 to your computer and use it in GitHub Desktop.
Save anonymous/1253041 to your computer and use it in GitHub Desktop.
Multicast based communication in GigaSpaces
package gs.samples.multicast;
import com.gigaspaces.annotation.pojo.SpaceClass;
import com.gigaspaces.annotation.pojo.SpaceId;
import com.gigaspaces.annotation.pojo.SpaceRouting;
@SpaceClass
public class Message {
private String message;
private Integer routingId;
private String id;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@SpaceRouting
public Integer getRoutingId() {
return routingId;
}
public void setRoutingId(Integer routingId) {
this.routingId = routingId;
}
@SpaceId(autoGenerate=true)
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Message other = (Message) obj;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
return true;
}
@Override
public String toString() {
return "Message [message=" + message + ", routingId=" + routingId + ", id=" + id + "]";
}
}
package gs.samples.multicast;
import static org.junit.Assert.assertNotNull;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.openspaces.core.GigaSpace;
import org.openspaces.core.GigaSpaceConfigurer;
import org.openspaces.core.cluster.ClusterInfo;
import org.openspaces.core.space.UrlSpaceConfigurer;
import org.openspaces.events.SpaceDataEventListener;
import org.openspaces.events.notify.AbstractNotifyEventListenerContainer;
import org.openspaces.events.notify.SimpleNotifyContainerConfigurer;
import org.springframework.transaction.TransactionStatus;
public class MulticastBasedNotificationTest {
private static final String SPACE_NAME = "multicast-test-space";
private static final String GROUP_NAME = "multicast-test-group";
private UrlSpaceConfigurer partition1Configurer;
private UrlSpaceConfigurer partition2Configurer;
private GigaSpace clusteredProxy;
private BlockingQueue<Message> receivedMessages;
@Before
public void setup() {
partition1Configurer = createSpacePartition(1, 2);
partition2Configurer = createSpacePartition(2, 2);
clusteredProxy = createClusteredProxy();
receivedMessages = new LinkedBlockingQueue<Message>();
}
@After
public void destroySpace() throws Exception {
partition1Configurer.destroy();
partition2Configurer.destroy();
}
@Test
public void unicastBasedNotificationTest() throws Exception {
new SimpleNotifyContainerConfigurer(clusteredProxy)
.template(new Message())
.notifyWrite(true)
.replicateNotifyTemplate(false)
.comType(AbstractNotifyEventListenerContainer.COM_TYPE_UNICAST)
.eventListener(new SpaceDataEventListener<Message>() {
@Override
public void onEvent(Message data, GigaSpace gigaSpace,
TransactionStatus txStatus, Object source) {
receivedMessages.add(data);
}
}).notifyContainer();
clusteredProxy.write(messageWithRoutingId(0));
clusteredProxy.write(messageWithRoutingId(1));
assertNotNull("expected to receive a message", receivedMessages.poll(1, TimeUnit.SECONDS));
}
@Test
public void multicastBasedNotificationTest() throws Exception {
new SimpleNotifyContainerConfigurer(clusteredProxy)
.template(new Message())
.notifyWrite(true)
.replicateNotifyTemplate(false)
.comType(AbstractNotifyEventListenerContainer.COM_TYPE_MULTICAST)
.eventListener(new SpaceDataEventListener<Message>() {
@Override
public void onEvent(Message data, GigaSpace gigaSpace,
TransactionStatus txStatus, Object source) {
receivedMessages.add(data);
}
}).notifyContainer();
clusteredProxy.write(messageWithRoutingId(0));
clusteredProxy.write(messageWithRoutingId(1));
assertNotNull("expected to receive a message", receivedMessages.poll(1, TimeUnit.SECONDS));
}
private UrlSpaceConfigurer createSpacePartition(int instanceId, int numberOfPartitions) {
UrlSpaceConfigurer configurer = new UrlSpaceConfigurer("/./" + SPACE_NAME);
configurer.addProperty("space-config.workers.MulticastNotifyWorker.enabled", "true");
configurer.lookupGroups(GROUP_NAME);
configurer.lookupTimeout(1);
configurer.clusterInfo(clusterInfoFor(instanceId, numberOfPartitions));
configurer.create();
return configurer;
}
private static ClusterInfo clusterInfoFor(int instanceId, int numberOfPartitions) {
ClusterInfo clusterInfo = new ClusterInfo();
clusterInfo.setSchema("partitioned-sync2backup");
clusterInfo.setInstanceId(instanceId);
clusterInfo.setBackupId(null);
clusterInfo.setNumberOfBackups(0);
clusterInfo.setNumberOfInstances(numberOfPartitions);
return clusterInfo;
}
private GigaSpace createClusteredProxy() {
String url = "jini://*/*/" + SPACE_NAME + "?groups=" + GROUP_NAME;
return new GigaSpaceConfigurer(new UrlSpaceConfigurer(url)).gigaSpace();
}
private Message messageWithRoutingId(int routingId) {
Message message = new Message();
message.setRoutingId(routingId);
message.setMessage("foo");
return message;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment