public
Last active — forked from momania/gist:858476

  • Download Gist
gistfile1.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
import akka.amqp.AMQP._
import akka.amqp._
import akka.actor._
import akka.util.Logging
 
object LoadBalancingTest extends Logging {
 
val connection = AMQP.newConnection()
val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct)
 
class JobConsumer(id: Int) extends Actor {
 
def receive = {
case Delivery(payload, _, _, _, _, _) =>
println("(" + id + ") received message: " + new String(payload))
}
}
 
def main(args: Array[String]) {
// == Consumer
val channelParameters = Some(ChannelParameters(prefetchSize = 1))
for (i <- 1 to 3) {
val actor = Actor.actorOf(new JobConsumer(i))
AMQP.newConsumer(connection, ConsumerParameters("some.routing.key",
actor, Some("my-job-queue"), Some(exchangeParameters), channelParameters = channelParameters))
}
 
// == Producer
val producer = AMQP.newProducer(connection,
ProducerParameters(Some(exchangeParameters), Some("my-producer")))
 
for (i <- 1 to 30) {
producer ! Message(("data (" + i + ")").getBytes, "some.routing.key")
Thread.sleep(200)
}
}
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.