Skip to content

Instantly share code, notes, and snippets.

@momania
Created March 7, 2011 13:05
Show Gist options
  • Save momania/858476 to your computer and use it in GitHub Desktop.
Save momania/858476 to your computer and use it in GitHub Desktop.
Akka AMQP Loadbalance
import akka.amqp.AMQP._
import akka.amqp._
import akka.actor._
import java.util.concurrent.{TimeUnit, CountDownLatch}
import util.Random
object LoadBalancingDemo {
def main(args: Array[String]) {
val workers = 15
val messages = 100
val maxRandomWaitMs = 5000
val localConnection = AMQP.newConnection()
val directExchangeParameters = ExchangeParameters("my_direct_exchange", Direct)
// specifies how many messages the amqp channel should
// prefetch as unacknowledged messages before processing
// 0 = unlimited
val smallPrefetchChannelParameters = Some(ChannelParameters(prefetchSize = 1))
val someRoutingKey = "some.routing.key"
val countDownLatch = new CountDownLatch(messages)
// consumer
class JobConsumer(id: Int) extends Actor {
self.id = "jobconsumer-" + id
def receive = {
case Delivery(payload, _, _, _, _, _) =>
println(self.id + " received message: " + new String(payload))
TimeUnit.MILLISECONDS.sleep(Random.nextInt(maxRandomWaitMs))
countDownLatch.countDown
}
}
// consumers
for (i <- 1 to workers) {
val actor =
AMQP.newConsumer(
connection = localConnection,
consumerParameters = ConsumerParameters(
routingKey = someRoutingKey,
deliveryHandler = Actor.actorOf(new JobConsumer(i)),
queueName = Some("my-job-queue"),
exchangeParameters = Some(directExchangeParameters),
channelParameters = smallPrefetchChannelParameters))
}
// producer
val producer = AMQP.newProducer(
connection = localConnection,
producerParameters = ProducerParameters(
exchangeParameters = Some(directExchangeParameters)))
//
for (i <- 1 to messages) {
producer ! Message(("data (" + i + ")").getBytes, someRoutingKey)
}
println("Sent all " + messages + " messages - awaiting processing...")
countDownLatch.await((maxRandomWaitMs * messages) + 1000, TimeUnit.MILLISECONDS)
AMQP.shutdownAll
Actor.registry.shutdownAll
exit(0)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment