Skip to content

Instantly share code, notes, and snippets.

@chrisdinn
Forked from ANorwell/pushpull.scala
Created July 27, 2012 15:21
Show Gist options
  • Save chrisdinn/3188627 to your computer and use it in GitHub Desktop.
Save chrisdinn/3188627 to your computer and use it in GitHub Desktop.
ZMQ Push/Pull needs sleep
.DS_Store
project/target
target
.idea
.idea_modules
*.log
name := "push-pull-test"
organization := "com.mdialog"
version := "0.0.1"
scalaVersion := "2.9.1"
scalacOptions ++= Seq("-unchecked", "-deprecation")
libraryDependencies ++= Seq(
"com.typesafe.akka" % "akka-actor" % "2.0.1",
"com.typesafe.akka" % "akka-zeromq" % "2.0.1",
"com.typesafe.akka" % "akka-testkit" % "2.0.1" % "test"
)
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
package com.test
import util.Random
import akka.actor._
import akka.zeromq._
class PullTestActor(val address: String) extends Actor {
val pullSocket = context.system.newSocket(SocketType.Pull,
Bind(address), Listener(self))
def receive = {
case m: ZMQMessage ⇒ println("Pulled: " + m.firstFrameAsString)
case x ⇒ println("Got default case: " + x)
}
}
object TestPushPull extends App {
implicit val system = ActorSystem("zmqtest")
val address = "tcp://0.0.0.0:%s" format {
val s = new java.net.ServerSocket(0)
try s.getLocalPort finally s.close()
}
println("Address " + address)
val pull = system.actorOf(Props(new PullTestActor(address)), "PullTestActor")
val pushSocket = system.newSocket(SocketType.Push, Connect(address))
//Try adding/removing this line
Thread.sleep(25)
1 until 1000 foreach { n ⇒
println("Pushing msg " + n)
pushSocket ! ZMQMessage(Seq(Frame("msg " + n)))
Thread.sleep(1000)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment