Skip to content

Instantly share code, notes, and snippets.

@windymelt
Created June 6, 2015 10:10
Show Gist options
  • Save windymelt/596d46ce0db27f8a2b44 to your computer and use it in GitHub Desktop.
Save windymelt/596d46ce0db27f8a2b44 to your computer and use it in GitHub Desktop.
Akka ZeroMQ Extension Example
name := "Exercise Akka ZeroMQ"
version := "0.1-SNAPSHOT"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.2.5",
"com.typesafe.akka" %% "akka-zeromq" % "2.2.5"
)
package momijikawa.exercisezmq
import scala.concurrent.ExecutionContext
object Main extends App {
import akka.actor._
import akka.zeromq._
import akka.util.ByteString
import concurrent.duration._
import collection.immutable.Seq
override def main(args: Array[String]): Unit = {
val system = ActorSystem("Exercise-ZMQ-Server")
implicit def execContext: ExecutionContext = system.dispatcher
val schedule_initialDuration = 1 second
val schedule_interval = 2 seconds
val topic: String = "ping"
val message = "PINGING MESSAGE"
val pubSocket = ZeroMQExtension(system).newSocket(SocketType.Pub,
Bind("tcp://127.0.0.1:21231"))
println("test start on\ntcp://127.0.0.1:21231")
system.scheduler.schedule(schedule_initialDuration, schedule_interval, pubSocket,
ZMQMessage(
Seq(
ByteString(topic), // 第一フレーム目にtopicを指定する
ByteString(message))))
}
}
package momijikawa.exercisezmq
object Main extends App {
import akka.actor._
import akka.zeromq._
override def main(args: Array[String]): Unit = {
val system = ActorSystem("Exercise-ZMQ")
val subscribingTopic = "ping"
class Listener extends Actor {
def receive: Receive = {
case Connecting =>
println("Connecting..")
case Closed =>
println("Closed")
case m: ZMQMessage =>
println(s"ZMQMessage on topic [${m.frame(0).decodeString("UTF-8")}]: ${m.frame(1).decodeString("UTF-8")}")
case unknown =>
println("Unknown message: " + unknown)
}
}
val listener = system.actorOf(Props(classOf[Listener]))
val reqSocket = ZeroMQExtension(system).newSubSocket(
Array(Listener(listener), Connect("tcp://127.0.0.1:21231")))
reqSocket ! Subscribe(subscribingTopic)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment