Skip to content

@tjweir /gist:864882 forked from momania/gist:858476
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
import akka.amqp.AMQP._
import akka.amqp._
import akka.actor._
import akka.util.Logging
object LoadBalancingTest extends Logging {
val connection = AMQP.newConnection()
val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct)
class JobConsumer(id: Int) extends Actor {
def receive = {
case Delivery(payload, _, _, _, _, _) =>
println("(" + id + ") received message: " + new String(payload))
}
}
def main(args: Array[String]) {
// == Consumer
val channelParameters = Some(ChannelParameters(prefetchSize = 1))
for (i <- 1 to 3) {
val actor = Actor.actorOf(new JobConsumer(i))
AMQP.newConsumer(connection, ConsumerParameters("some.routing.key",
actor, Some("my-job-queue"), Some(exchangeParameters), channelParameters = channelParameters))
}
// == Producer
val producer = AMQP.newProducer(connection,
ProducerParameters(Some(exchangeParameters), Some("my-producer")))
for (i <- 1 to 30) {
producer ! Message(("data (" + i + ")").getBytes, "some.routing.key")
Thread.sleep(200)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.