Skip to content

Instantly share code, notes, and snippets.

@sstone
Created August 29, 2014 11:11
Show Gist options
  • Save sstone/1a5a9b404983a31fc18d to your computer and use it in GitHub Desktop.
Save sstone/1a5a9b404983a31fc18d to your computer and use it in GitHub Desktop.
package com.github.sstone.amqp
import java.util.concurrent.TimeUnit
import akka.actor._
import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router}
import akka.util.Timeout
import com.github.sstone.amqp.Amqp._
import com.rabbitmq.client.ConnectionFactory
import scala.concurrent.duration._
object Test1 extends App {
val system = ActorSystem("mySystem")
implicit val timeout = Timeout(5 seconds)
val connFactory = new ConnectionFactory()
val uri = system.settings.config.getString("amqp-client-test.rabbitmq.uri")
connFactory.setUri(uri)
val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second))
val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props())
waitForConnection(system, conn, producer).await(5, TimeUnit.SECONDS)
val body = "test".getBytes("UTF-8")
val start = System.nanoTime()
val count = 10000
for (i <- 0 to count) {
producer ! Publish("", "my_queue", body)
}
val end = System.nanoTime()
println(s"sending $count message in ${(end - start) / 1000000} ms")
}
object Test2 extends App {
val system = ActorSystem("mySystem")
implicit val timeout = Timeout(5 seconds)
val connFactory = new ConnectionFactory()
val uri = system.settings.config.getString("amqp-client-test.rabbitmq.uri")
connFactory.setUri(uri)
val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second))
val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props())
waitForConnection(system, conn, producer).await(5, TimeUnit.SECONDS)
val body = "test".getBytes("UTF-8")
val count = 10000
class MyActor extends Actor {
var sent = 0
var start = 0L
self ! 'start
def receive = {
case 'start =>
start = System.nanoTime()
for (i <- 1 to count) {
producer ! Publish("", "my_queue", body)
}
println(s"sending $count message in ${(System.nanoTime() - start) / 1000000} ms")
case Amqp.Ok(_: Publish, _) =>
sent = sent + 1
if (sent == count) {
println(s"publishing $count message in ${(System.nanoTime() - start) / 1000000} ms")
context.stop(self)
}
}
}
val myActor = system.actorOf(Props[MyActor])
}
object Test3 extends App {
val system = ActorSystem("mySystem")
implicit val timeout = Timeout(5 seconds)
val connFactory = new ConnectionFactory()
val uri = system.settings.config.getString("amqp-client-test.rabbitmq.uri")
connFactory.setUri(uri)
val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second))
val producers = for (i <- 1 to 100) yield ConnectionOwner.createChildActor(conn, ChannelOwner.props())
val producer = Router(RoundRobinRoutingLogic(), producers.map(ActorRefRoutee))
waitForConnection(system, conn).await(5, TimeUnit.SECONDS)
val body = "test".getBytes("UTF-8")
val count = 10000
class MyActor extends Actor {
var sent = 0
var start = 0L
self ! 'start
def receive = {
case 'start =>
start = System.nanoTime()
for (i <- 1 to count) {
producer.route(Publish("", "my_queue", body), self)
}
println(s"sending $count message in ${(System.nanoTime() - start) / 1000000} ms")
case Amqp.Ok(_: Publish, _) =>
sent = sent + 1
if (sent == count) {
println(s"publishing $count message in ${(System.nanoTime() - start) / 1000000} ms")
context.stop(self)
}
}
}
val myActor = system.actorOf(Props[MyActor])
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment