public
Created — forked from momania/gist:858476

Akka AMQP Loadbalance

  • Download Gist
gistfile1.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
import akka.amqp.AMQP._
import akka.amqp._
import akka.actor._
import java.util.concurrent.{TimeUnit, CountDownLatch}
import util.Random
 
object LoadBalancingDemo {
 
def main(args: Array[String]) {
 
val workers = 15
val messages = 100
val maxRandomWaitMs = 5000
 
val localConnection = AMQP.newConnection()
val directExchangeParameters = ExchangeParameters("my_direct_exchange", Direct)
// specifies how many messages the amqp channel should
// prefetch as unacknowledged messages before processing
// 0 = unlimited
val smallPrefetchChannelParameters = Some(ChannelParameters(prefetchSize = 1))
val someRoutingKey = "some.routing.key"
 
val countDownLatch = new CountDownLatch(messages)
 
 
// consumer
class JobConsumer(id: Int) extends Actor {
self.id = "jobconsumer-" + id
 
def receive = {
case Delivery(payload, _, _, _, _, _) =>
println(self.id + " received message: " + new String(payload))
TimeUnit.MILLISECONDS.sleep(Random.nextInt(maxRandomWaitMs))
countDownLatch.countDown
}
}
 
// consumers
for (i <- 1 to workers) {
val actor =
AMQP.newConsumer(
connection = localConnection,
consumerParameters = ConsumerParameters(
routingKey = someRoutingKey,
deliveryHandler = Actor.actorOf(new JobConsumer(i)),
queueName = Some("my-job-queue"),
exchangeParameters = Some(directExchangeParameters),
channelParameters = smallPrefetchChannelParameters))
}
 
// producer
val producer = AMQP.newProducer(
connection = localConnection,
producerParameters = ProducerParameters(
exchangeParameters = Some(directExchangeParameters)))
 
//
for (i <- 1 to messages) {
producer ! Message(("data (" + i + ")").getBytes, someRoutingKey)
}
println("Sent all " + messages + " messages - awaiting processing...")
 
countDownLatch.await((maxRandomWaitMs * messages) + 1000, TimeUnit.MILLISECONDS)
 
AMQP.shutdownAll
Actor.registry.shutdownAll
exit(0)
}
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.