Created
August 29, 2014 11:11
-
-
Save sstone/1a5a9b404983a31fc18d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.sstone.amqp | |
import java.util.concurrent.TimeUnit | |
import akka.actor._ | |
import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router} | |
import akka.util.Timeout | |
import com.github.sstone.amqp.Amqp._ | |
import com.rabbitmq.client.ConnectionFactory | |
import scala.concurrent.duration._ | |
object Test1 extends App { | |
val system = ActorSystem("mySystem") | |
implicit val timeout = Timeout(5 seconds) | |
val connFactory = new ConnectionFactory() | |
val uri = system.settings.config.getString("amqp-client-test.rabbitmq.uri") | |
connFactory.setUri(uri) | |
val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second)) | |
val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props()) | |
waitForConnection(system, conn, producer).await(5, TimeUnit.SECONDS) | |
val body = "test".getBytes("UTF-8") | |
val start = System.nanoTime() | |
val count = 10000 | |
for (i <- 0 to count) { | |
producer ! Publish("", "my_queue", body) | |
} | |
val end = System.nanoTime() | |
println(s"sending $count message in ${(end - start) / 1000000} ms") | |
} | |
object Test2 extends App { | |
val system = ActorSystem("mySystem") | |
implicit val timeout = Timeout(5 seconds) | |
val connFactory = new ConnectionFactory() | |
val uri = system.settings.config.getString("amqp-client-test.rabbitmq.uri") | |
connFactory.setUri(uri) | |
val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second)) | |
val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props()) | |
waitForConnection(system, conn, producer).await(5, TimeUnit.SECONDS) | |
val body = "test".getBytes("UTF-8") | |
val count = 10000 | |
class MyActor extends Actor { | |
var sent = 0 | |
var start = 0L | |
self ! 'start | |
def receive = { | |
case 'start => | |
start = System.nanoTime() | |
for (i <- 1 to count) { | |
producer ! Publish("", "my_queue", body) | |
} | |
println(s"sending $count message in ${(System.nanoTime() - start) / 1000000} ms") | |
case Amqp.Ok(_: Publish, _) => | |
sent = sent + 1 | |
if (sent == count) { | |
println(s"publishing $count message in ${(System.nanoTime() - start) / 1000000} ms") | |
context.stop(self) | |
} | |
} | |
} | |
val myActor = system.actorOf(Props[MyActor]) | |
} | |
object Test3 extends App { | |
val system = ActorSystem("mySystem") | |
implicit val timeout = Timeout(5 seconds) | |
val connFactory = new ConnectionFactory() | |
val uri = system.settings.config.getString("amqp-client-test.rabbitmq.uri") | |
connFactory.setUri(uri) | |
val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second)) | |
val producers = for (i <- 1 to 100) yield ConnectionOwner.createChildActor(conn, ChannelOwner.props()) | |
val producer = Router(RoundRobinRoutingLogic(), producers.map(ActorRefRoutee)) | |
waitForConnection(system, conn).await(5, TimeUnit.SECONDS) | |
val body = "test".getBytes("UTF-8") | |
val count = 10000 | |
class MyActor extends Actor { | |
var sent = 0 | |
var start = 0L | |
self ! 'start | |
def receive = { | |
case 'start => | |
start = System.nanoTime() | |
for (i <- 1 to count) { | |
producer.route(Publish("", "my_queue", body), self) | |
} | |
println(s"sending $count message in ${(System.nanoTime() - start) / 1000000} ms") | |
case Amqp.Ok(_: Publish, _) => | |
sent = sent + 1 | |
if (sent == count) { | |
println(s"publishing $count message in ${(System.nanoTime() - start) / 1000000} ms") | |
context.stop(self) | |
} | |
} | |
} | |
val myActor = system.actorOf(Props[MyActor]) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment