Skip to content

Instantly share code, notes, and snippets.

@viktorklang
Created October 18, 2011 11:52
Show Gist options
  • Save viktorklang/1295252 to your computer and use it in GitHub Desktop.
Save viktorklang/1295252 to your computer and use it in GitHub Desktop.
0mq
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.zeromq
import akka.actor.{Actor, ActorRef}
import akka.dispatch.MessageDispatcher
import akka.util.Duration
import akka.zeromq.SocketType._
import java.util.concurrent.atomic.AtomicReference
import org.zeromq.ZMQ.{Socket, Poller}
import org.zeromq.{ZMQ => JZMQ}
import scala.annotation.tailrec
private[zeromq] class ConcurrentSocketActor(params: SocketParameters, dispatcher: MessageDispatcher, pollIntervalMs: Long) extends Actor {
private val noBytes = Array[Byte]()
private val socket: Socket = params.context.socket(params.socketType)
private val poller: Poller = params.context.poller
self.dispatcher = dispatcher
self.receiveTimeout = pollIntervalMs
poller.register(socket, Poller.POLLIN)
override def postStop { poller.unregister(socket); socket.close; notifyListener(Closed) }
def notifyListener(msg: Any) {
params.listener match {
case Some(x) if x.isShutdown => self.stop //No point in continuing if the listener is down?
case Some(x) => x ! msg
case None => //???
}
}
override def receive: Receive = {
case Connect(endpoint) => socket.connect(endpoint); notifyListener(Connecting)
case Bind(endpoint) => socket.bind(endpoint)
case Close => self.stop
case Send(frames) =>
for (i <- 0 until frames.length)
socket.send(frames(i).payload.toArray, if (i < frames.length - 1) JZMQ.SNDMORE else 0)
receiveFrames()
case Subscribe(topic) => socket.subscribe(topic.toArray); receiveFrames()
case Unsubscribe(topic) => socket.unsubscribe(topic.toArray); receiveFrames()
case ZMQMessage(frames) => sendFrames(frames); receiveFrames()
case ReceiveTimeout => receiveFrames()
}
def receiveFrames(): Seq[Frame] = {
val i = new Iterator[Frame] {
@inline def receiveBytes(): Array[Byte] = socket.recv(0) match {
case null => noBytes
case bytes: Array[Byte] if bytes.length > 0 => bytes
case _ => noBytes
}
var nextBytes = receiveBytes()
def hasNext = nextBytes ne noBytes
def next = nextBytes match {
case `noBytes` => throw new EOFException("EOF")
case bytes =>
nextBytes = if (socket.hasReceiveMore) receiveBytes() else noBytes
Frame(bytes)
}
}
if (poller.poll(params.pollTimeoutDuration.toMillis) > 0 && poller.pollin(0)) {
val frames = (Vector.empty[Frame] /: i)(_ :+ _)
if (frames.nonEmpty) notifyListener(params.deserializer(frames))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment