Created
September 30, 2011 08:02
-
-
Save anonymous/1253041 to your computer and use it in GitHub Desktop.
Multicast based communication in GigaSpaces
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package gs.samples.multicast; | |
import com.gigaspaces.annotation.pojo.SpaceClass; | |
import com.gigaspaces.annotation.pojo.SpaceId; | |
import com.gigaspaces.annotation.pojo.SpaceRouting; | |
@SpaceClass | |
public class Message { | |
private String message; | |
private Integer routingId; | |
private String id; | |
public String getMessage() { | |
return message; | |
} | |
public void setMessage(String message) { | |
this.message = message; | |
} | |
@SpaceRouting | |
public Integer getRoutingId() { | |
return routingId; | |
} | |
public void setRoutingId(Integer routingId) { | |
this.routingId = routingId; | |
} | |
@SpaceId(autoGenerate=true) | |
public String getId() { | |
return id; | |
} | |
public void setId(String id) { | |
this.id = id; | |
} | |
@Override | |
public int hashCode() { | |
final int prime = 31; | |
int result = 1; | |
result = prime * result + ((id == null) ? 0 : id.hashCode()); | |
return result; | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (this == obj) | |
return true; | |
if (obj == null) | |
return false; | |
if (getClass() != obj.getClass()) | |
return false; | |
Message other = (Message) obj; | |
if (id == null) { | |
if (other.id != null) | |
return false; | |
} else if (!id.equals(other.id)) | |
return false; | |
return true; | |
} | |
@Override | |
public String toString() { | |
return "Message [message=" + message + ", routingId=" + routingId + ", id=" + id + "]"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package gs.samples.multicast; | |
import static org.junit.Assert.assertNotNull; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import org.junit.After; | |
import org.junit.Before; | |
import org.junit.Test; | |
import org.openspaces.core.GigaSpace; | |
import org.openspaces.core.GigaSpaceConfigurer; | |
import org.openspaces.core.cluster.ClusterInfo; | |
import org.openspaces.core.space.UrlSpaceConfigurer; | |
import org.openspaces.events.SpaceDataEventListener; | |
import org.openspaces.events.notify.AbstractNotifyEventListenerContainer; | |
import org.openspaces.events.notify.SimpleNotifyContainerConfigurer; | |
import org.springframework.transaction.TransactionStatus; | |
public class MulticastBasedNotificationTest { | |
private static final String SPACE_NAME = "multicast-test-space"; | |
private static final String GROUP_NAME = "multicast-test-group"; | |
private UrlSpaceConfigurer partition1Configurer; | |
private UrlSpaceConfigurer partition2Configurer; | |
private GigaSpace clusteredProxy; | |
private BlockingQueue<Message> receivedMessages; | |
@Before | |
public void setup() { | |
partition1Configurer = createSpacePartition(1, 2); | |
partition2Configurer = createSpacePartition(2, 2); | |
clusteredProxy = createClusteredProxy(); | |
receivedMessages = new LinkedBlockingQueue<Message>(); | |
} | |
@After | |
public void destroySpace() throws Exception { | |
partition1Configurer.destroy(); | |
partition2Configurer.destroy(); | |
} | |
@Test | |
public void unicastBasedNotificationTest() throws Exception { | |
new SimpleNotifyContainerConfigurer(clusteredProxy) | |
.template(new Message()) | |
.notifyWrite(true) | |
.replicateNotifyTemplate(false) | |
.comType(AbstractNotifyEventListenerContainer.COM_TYPE_UNICAST) | |
.eventListener(new SpaceDataEventListener<Message>() { | |
@Override | |
public void onEvent(Message data, GigaSpace gigaSpace, | |
TransactionStatus txStatus, Object source) { | |
receivedMessages.add(data); | |
} | |
}).notifyContainer(); | |
clusteredProxy.write(messageWithRoutingId(0)); | |
clusteredProxy.write(messageWithRoutingId(1)); | |
assertNotNull("expected to receive a message", receivedMessages.poll(1, TimeUnit.SECONDS)); | |
} | |
@Test | |
public void multicastBasedNotificationTest() throws Exception { | |
new SimpleNotifyContainerConfigurer(clusteredProxy) | |
.template(new Message()) | |
.notifyWrite(true) | |
.replicateNotifyTemplate(false) | |
.comType(AbstractNotifyEventListenerContainer.COM_TYPE_MULTICAST) | |
.eventListener(new SpaceDataEventListener<Message>() { | |
@Override | |
public void onEvent(Message data, GigaSpace gigaSpace, | |
TransactionStatus txStatus, Object source) { | |
receivedMessages.add(data); | |
} | |
}).notifyContainer(); | |
clusteredProxy.write(messageWithRoutingId(0)); | |
clusteredProxy.write(messageWithRoutingId(1)); | |
assertNotNull("expected to receive a message", receivedMessages.poll(1, TimeUnit.SECONDS)); | |
} | |
private UrlSpaceConfigurer createSpacePartition(int instanceId, int numberOfPartitions) { | |
UrlSpaceConfigurer configurer = new UrlSpaceConfigurer("/./" + SPACE_NAME); | |
configurer.addProperty("space-config.workers.MulticastNotifyWorker.enabled", "true"); | |
configurer.lookupGroups(GROUP_NAME); | |
configurer.lookupTimeout(1); | |
configurer.clusterInfo(clusterInfoFor(instanceId, numberOfPartitions)); | |
configurer.create(); | |
return configurer; | |
} | |
private static ClusterInfo clusterInfoFor(int instanceId, int numberOfPartitions) { | |
ClusterInfo clusterInfo = new ClusterInfo(); | |
clusterInfo.setSchema("partitioned-sync2backup"); | |
clusterInfo.setInstanceId(instanceId); | |
clusterInfo.setBackupId(null); | |
clusterInfo.setNumberOfBackups(0); | |
clusterInfo.setNumberOfInstances(numberOfPartitions); | |
return clusterInfo; | |
} | |
private GigaSpace createClusteredProxy() { | |
String url = "jini://*/*/" + SPACE_NAME + "?groups=" + GROUP_NAME; | |
return new GigaSpaceConfigurer(new UrlSpaceConfigurer(url)).gigaSpace(); | |
} | |
private Message messageWithRoutingId(int routingId) { | |
Message message = new Message(); | |
message.setRoutingId(routingId); | |
message.setMessage("foo"); | |
return message; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment