Last active
August 29, 2015 13:56
-
-
Save inexplicable/9165176 to your computer and use it in GitHub Desktop.
Akka & JeroMQ's router to enable messaging across Akka & External
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sbt.Resolver | |
name := "akka-jeromq" | |
version := "1.0" | |
resolvers in ThisBuild ++= Seq( | |
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", | |
Resolver.sonatypeRepo("releases") | |
) | |
libraryDependencies ++= Seq( | |
"com.typesafe.akka" %% "akka-actor" % "2.2.3", | |
"org.zeromq" % "jeromq" % "0.3.3" | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* | |
* One client two servers (round roobin) | |
* | |
*/ | |
var cluster = require('cluster'), | |
zmq = require('zmq'),//needs to install zmq.node | |
port = 'tcp://127.0.0.1:5555'; | |
//dealer = client | |
if(cluster.isMaster){ | |
for(var i = 0; i < 1; i += 1){ | |
cluster.fork(); | |
} | |
} | |
else{ | |
var http = require('http'); | |
var socket = zmq.socket('dealer'); | |
socket.identity = 'client' + process.pid; | |
socket.connect(port); | |
console.log('bound! version:%s', zmq.version); | |
var count = 0, | |
replies = 0, | |
total = 0, | |
before = Date.now(); | |
http.createServer(function(req, res){ | |
var value = Math.floor(Math.random()*100); | |
console.log(socket.identity + ': asking ' + value); | |
count += 1; | |
socket.send(value); | |
res.end('ok', 200); | |
}).listen(8080); | |
socket.on('message', function(data) { | |
console.log(socket.identity + ': answer data ' + data); | |
var now = Date.now() | |
replies += 1; | |
total += now - before; | |
before = now; | |
}); | |
setTimeout(function(){ | |
console.log('[%d:finished] total messages:%d and replies:%d using total:%dms avg rtt:%dms', process.pid, count, replies, total, total / count) | |
socket.close(); | |
process.exit(0); | |
}, 60000); | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ebay | |
import akka.actor._ | |
import org.zeromq.{ZMsg, ZContext, ZMQ} | |
import java.nio.charset.Charset | |
/** | |
* Created by huzhou on 2/4/14. | |
*/ | |
case object Recv | |
class RouterActor extends Actor { | |
val zCtx = new ZContext() | |
// Frontend socket talks to clients over TCP | |
val router = zCtx.createSocket(ZMQ.ROUTER) | |
val utf8 = Charset.forName("utf-8") | |
override def preStart = { | |
router.setRcvHWM(1000000) | |
router.setSndHWM(1000000) | |
router.bind("tcp://*:5555") | |
printf("[router] started on port 5555\n") | |
} | |
def receive:Actor.Receive = { | |
case Recv => | |
val attempt = ZMsg.recvMsg(router, ZMQ.DONTWAIT) | |
if(!attempt.isEmpty){ | |
val identity = attempt.peek | |
if(identity.hasData){ | |
printf("[router][echo] %s\n", new String(identity.getData, utf8)) | |
attempt.send(router) | |
} | |
} | |
self ! Recv | |
case _ => | |
self ! Recv //continue | |
} | |
} | |
object RouterMain extends App { | |
val system = ActorSystem("RouterActor") | |
val router = system.actorOf(Props[RouterActor]) | |
router ! Recv //start everything | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//A general actor trait based on JeroMQ, Akka & FSM | |
sealed trait ZSocketState | |
case object ZSocketUninitialized extends ZSocketState | |
case object ZSocketActive extends ZSocketState | |
case class Bind(val address:String) | |
case class Connect(val address:String) | |
case class RecvHWM(val hwm:Int) | |
case class SendHWM(val hwm:Int) | |
case class SocketType(val `type`:Int) | |
case class MaxDelay(val delay:Long) | |
sealed trait ZSocketData | |
case class Settings(val socketType:SocketType, val recvHWM:RecvHWM, val sendHWM:SendHWM, val maxDelay:Long) extends ZSocketData | |
case class Runnings(val socket:Socket, val maxDelay:Long) extends ZSocketData | |
case class ReceiveAsync(val delay:Long) | |
case class ZEnvelop(val identity:ZFrame, val payload:ZFrame*) | |
trait ZSocketOnAkka extends Actor with FSM[ZSocketState, ZSocketData]{ | |
def zContext:ZContext = new ZContext | |
def consume(zEnvelop:ZEnvelop) | |
def reply(zEnvelop:ZEnvelop, zSocket:Socket) | |
startWith(ZSocketUninitialized, Settings(null, null, null, -1L)) | |
when(ZSocketUninitialized){ | |
case Event(hwm:RecvHWM, origin:Settings) => | |
stay using(Settings(origin.socketType, hwm, origin.sendHWM, origin.maxDelay)) | |
case Event(hwm:SendHWM, origin:Settings) => | |
stay using(Settings(origin.socketType, origin.recvHWM, hwm, origin.maxDelay)) | |
case Event(MaxDelay(delay), origin:Settings) => | |
stay using(Settings(origin.socketType, origin.recvHWM, origin.sendHWM, delay)) | |
case Event(socketType:SocketType, origin:Settings) => | |
stay using(Settings(socketType, origin.recvHWM, origin.sendHWM, origin.maxDelay)) | |
case Event(bind:Bind, origin:Settings) => | |
val zSocket = zContext.createSocket(origin.socketType.`type`) | |
zSocket.setRcvHWM(origin.recvHWM.hwm) | |
zSocket.setSndHWM(origin.sendHWM.hwm) | |
zSocket.bind(bind.address) | |
self ! ReceiveAsync(origin.maxDelay) | |
goto(ZSocketActive) using Runnings(zSocket, origin.maxDelay) | |
case Event(connect:Connect, origin:Settings) => | |
val zSocket = zContext.createSocket(origin.socketType.`type`) | |
zSocket.setRcvHWM(origin.recvHWM.hwm) | |
zSocket.setSndHWM(origin.sendHWM.hwm) | |
zSocket.connect(connect.address) | |
self ! ReceiveAsync(origin.maxDelay) | |
goto(ZSocketActive) using Runnings(zSocket, origin.maxDelay) | |
} | |
when(ZSocketActive){ | |
case Event(msg @ ReceiveAsync(delay), rt @ Runnings(socket, maxDelay)) => | |
val zMessage = ZMsg.recvMsg(socket, ZMQ.DONTWAIT) | |
val identity = zMessage.pop | |
if(identity.hasData){ | |
consume(ZEnvelop(identity, zMessage.pop)) | |
self ! ReceiveAsync(0L) | |
stay using rt | |
} | |
else{ | |
context.system.scheduler.scheduleOnce((if(delay == 0L) 1L else math.min(delay * 2L, maxDelay)) millis, self, msg) | |
stay using rt | |
} | |
case Event(msg:ZEnvelop, rt @ Runnings(socket, maxDelay)) => | |
reply(msg, socket) | |
stay using rt | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment