Skip to content

Instantly share code, notes, and snippets.

@bsbodden
Created April 3, 2015 13:49
Show Gist options
  • Save bsbodden/fe142e3773a53bcff5d4 to your computer and use it in GitHub Desktop.
Save bsbodden/fe142e3773a53bcff5d4 to your computer and use it in GitHub Desktop.
package io.integrallis.atmosphere
import io.dropwizard.lifecycle.Managed
import java.io.BufferedReader
import java.io.InputStreamReader
import java.net.InetAddress
import java.util.List
import java.util.Set
import org.jgroups.JChannel
import org.jgroups.Message
import org.jgroups.PhysicalAddress
import org.jgroups.ReceiverAdapter
import org.jgroups.View
import org.jgroups.protocols.TCPPING
import org.jgroups.stack.IpAddress
import org.jgroups.ReceiverAdapter
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.Host
import com.datastax.driver.core.Metadata
import org.atmosphere.cpr.Broadcaster
import org.atmosphere.cpr.BroadcasterFactory
import io.binnacle.models.BinnacleEvent
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.ObjectWriter
class AtmosphereClusteringService extends ReceiverAdapter implements Managed {
static String DEFAULT_JGROUPS_XML = "jgroups_config.xml"
static String CLUSTER_NAME = "binnacle"
com.datastax.driver.core.Cluster cluster
JChannel channel
AtmosphereClusteringService(com.datastax.driver.core.Cluster cluster) {
this.cluster = cluster
}
@Override
public void start() throws Exception {
println("STARTING AtmosphereClusteringService...")
channel = new JChannel("tcp.xml");
TCPPING ping = (TCPPING)channel.getProtocolStack().findProtocol(TCPPING.class)
List<PhysicalAddress> initialHosts = ping.getInitialHosts()
println("Original Initial Hosts...")
for (PhysicalAddress pa : initialHosts) {
IpAddress address = (IpAddress)pa
println("Address ==> " + address)
}
// Get list of nodes from Cassandra
Set<Host> cassNodes = cluster.getMetadata().getAllHosts()
for (Host host: cassNodes) {
InetAddress address = host.getAddress()
System.out.println("Address ==> " + address)
IpAddress host7800 = new IpAddress(address, 7800)
IpAddress host7801 = new IpAddress(address, 7801)
initialHosts.add(host7800)
initialHosts.add(host7801)
}
List<PhysicalAddress> modInitialHosts = ping.getInitialHosts()
System.out.println("Modified Initial Hosts...")
for (PhysicalAddress pa : modInitialHosts) {
IpAddress address = (IpAddress)pa
System.out.println("Address ==> " + address)
}
channel.setReceiver(this)
channel.setDiscardOwnMessages(true)
channel.connect(CLUSTER_NAME)
}
@Override
public void stop() throws Exception {
println("STOPPING AtmosphereClusteringService...")
channel.close()
}
@Override
public void viewAccepted(View new_view) {
println("** view: " + new_view);
}
@Override
public void receive(Message msg) {
println("IN AtmosphereClusteringService MSG Class ==> " + msg.getClass().getCanonicalName())
println(msg.getObject())
ObjectMapper mapper = new ObjectMapper()
BinnacleEvent event = mapper.readValue(msg.getObject(), BinnacleEvent.class)
String contextPushChannel = "${event.accountId}/${event.appId}/${event.contextId}"
String appPushChannel = "${event.accountId}/${event.appId}"
// Broadcast to context listeners
Broadcaster contextBroadcaster = BroadcasterFactory.getDefault().lookup( contextPushChannel, true )
if( contextBroadcaster != null ) {
contextBroadcaster.broadcast( msg.getObject() )
}
// Broadcast to application listeners
Broadcaster appBroadcaster = BroadcasterFactory.getDefault().lookup( appPushChannel, true )
if( appBroadcaster != null ) {
appBroadcaster.broadcast( msg.getObject() )
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment