Skip to content

Instantly share code, notes, and snippets.

@inexplicable
Last active August 29, 2015 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save inexplicable/9165176 to your computer and use it in GitHub Desktop.
Save inexplicable/9165176 to your computer and use it in GitHub Desktop.
Akka & JeroMQ's router to enable messaging across Akka & External
import sbt.Resolver
name := "akka-jeromq"
version := "1.0"
resolvers in ThisBuild ++= Seq(
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
Resolver.sonatypeRepo("releases")
)
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.2.3",
"org.zeromq" % "jeromq" % "0.3.3"
)
/*
*
* One client two servers (round roobin)
*
*/
var cluster = require('cluster'),
zmq = require('zmq'),//needs to install zmq.node
port = 'tcp://127.0.0.1:5555';
//dealer = client
if(cluster.isMaster){
for(var i = 0; i < 1; i += 1){
cluster.fork();
}
}
else{
var http = require('http');
var socket = zmq.socket('dealer');
socket.identity = 'client' + process.pid;
socket.connect(port);
console.log('bound! version:%s', zmq.version);
var count = 0,
replies = 0,
total = 0,
before = Date.now();
http.createServer(function(req, res){
var value = Math.floor(Math.random()*100);
console.log(socket.identity + ': asking ' + value);
count += 1;
socket.send(value);
res.end('ok', 200);
}).listen(8080);
socket.on('message', function(data) {
console.log(socket.identity + ': answer data ' + data);
var now = Date.now()
replies += 1;
total += now - before;
before = now;
});
setTimeout(function(){
console.log('[%d:finished] total messages:%d and replies:%d using total:%dms avg rtt:%dms', process.pid, count, replies, total, total / count)
socket.close();
process.exit(0);
}, 60000);
}
package com.ebay
import akka.actor._
import org.zeromq.{ZMsg, ZContext, ZMQ}
import java.nio.charset.Charset
/**
* Created by huzhou on 2/4/14.
*/
case object Recv
class RouterActor extends Actor {
val zCtx = new ZContext()
// Frontend socket talks to clients over TCP
val router = zCtx.createSocket(ZMQ.ROUTER)
val utf8 = Charset.forName("utf-8")
override def preStart = {
router.setRcvHWM(1000000)
router.setSndHWM(1000000)
router.bind("tcp://*:5555")
printf("[router] started on port 5555\n")
}
def receive:Actor.Receive = {
case Recv =>
val attempt = ZMsg.recvMsg(router, ZMQ.DONTWAIT)
if(!attempt.isEmpty){
val identity = attempt.peek
if(identity.hasData){
printf("[router][echo] %s\n", new String(identity.getData, utf8))
attempt.send(router)
}
}
self ! Recv
case _ =>
self ! Recv //continue
}
}
object RouterMain extends App {
val system = ActorSystem("RouterActor")
val router = system.actorOf(Props[RouterActor])
router ! Recv //start everything
}
//A general actor trait based on JeroMQ, Akka & FSM
sealed trait ZSocketState
case object ZSocketUninitialized extends ZSocketState
case object ZSocketActive extends ZSocketState
case class Bind(val address:String)
case class Connect(val address:String)
case class RecvHWM(val hwm:Int)
case class SendHWM(val hwm:Int)
case class SocketType(val `type`:Int)
case class MaxDelay(val delay:Long)
sealed trait ZSocketData
case class Settings(val socketType:SocketType, val recvHWM:RecvHWM, val sendHWM:SendHWM, val maxDelay:Long) extends ZSocketData
case class Runnings(val socket:Socket, val maxDelay:Long) extends ZSocketData
case class ReceiveAsync(val delay:Long)
case class ZEnvelop(val identity:ZFrame, val payload:ZFrame*)
trait ZSocketOnAkka extends Actor with FSM[ZSocketState, ZSocketData]{
def zContext:ZContext = new ZContext
def consume(zEnvelop:ZEnvelop)
def reply(zEnvelop:ZEnvelop, zSocket:Socket)
startWith(ZSocketUninitialized, Settings(null, null, null, -1L))
when(ZSocketUninitialized){
case Event(hwm:RecvHWM, origin:Settings) =>
stay using(Settings(origin.socketType, hwm, origin.sendHWM, origin.maxDelay))
case Event(hwm:SendHWM, origin:Settings) =>
stay using(Settings(origin.socketType, origin.recvHWM, hwm, origin.maxDelay))
case Event(MaxDelay(delay), origin:Settings) =>
stay using(Settings(origin.socketType, origin.recvHWM, origin.sendHWM, delay))
case Event(socketType:SocketType, origin:Settings) =>
stay using(Settings(socketType, origin.recvHWM, origin.sendHWM, origin.maxDelay))
case Event(bind:Bind, origin:Settings) =>
val zSocket = zContext.createSocket(origin.socketType.`type`)
zSocket.setRcvHWM(origin.recvHWM.hwm)
zSocket.setSndHWM(origin.sendHWM.hwm)
zSocket.bind(bind.address)
self ! ReceiveAsync(origin.maxDelay)
goto(ZSocketActive) using Runnings(zSocket, origin.maxDelay)
case Event(connect:Connect, origin:Settings) =>
val zSocket = zContext.createSocket(origin.socketType.`type`)
zSocket.setRcvHWM(origin.recvHWM.hwm)
zSocket.setSndHWM(origin.sendHWM.hwm)
zSocket.connect(connect.address)
self ! ReceiveAsync(origin.maxDelay)
goto(ZSocketActive) using Runnings(zSocket, origin.maxDelay)
}
when(ZSocketActive){
case Event(msg @ ReceiveAsync(delay), rt @ Runnings(socket, maxDelay)) =>
val zMessage = ZMsg.recvMsg(socket, ZMQ.DONTWAIT)
val identity = zMessage.pop
if(identity.hasData){
consume(ZEnvelop(identity, zMessage.pop))
self ! ReceiveAsync(0L)
stay using rt
}
else{
context.system.scheduler.scheduleOnce((if(delay == 0L) 1L else math.min(delay * 2L, maxDelay)) millis, self, msg)
stay using rt
}
case Event(msg:ZEnvelop, rt @ Runnings(socket, maxDelay)) =>
reply(msg, socket)
stay using rt
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment