Skip to content

Instantly share code, notes, and snippets.

@stephanos
Created March 4, 2011 14:42
Show Gist options
  • Save stephanos/854698 to your computer and use it in GitHub Desktop.
Save stephanos/854698 to your computer and use it in GitHub Desktop.
import akka.amqp.AMQP._
import akka.amqp._
import akka.actor._
import akka.dispatch._
import akka.util.Logging
object AMQPTest extends Logging {
val connection = AMQP.newConnection()
val exchangeParameters = ExchangeParameters("my_direct_exchange", Direct)
class JobConsumer extends Actor {
def receive = {
case Delivery(payload, _, _, _, _, _) =>
println("received message: " + new String(payload))
Thread.sleep(5000)
}
}
def newDispatcher(name: String, poolSize: Int) =
Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name)
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(poolSize)
.setMaxPoolSize(poolSize)
.build
def main(args: Array[String]) {
// == Consumer
def disp = newDispatcher("my-dispatcher", 3)
for (i <- 1 to 3) {
val actor = Actor.actorOf(classOf[JobConsumer])
actor.dispatcher = disp
AMQP.newConsumer(connection, ConsumerParameters("some.routing.key",
actor, Some("my-job-queue"), Some(exchangeParameters)))
}
// == Producer
val producer = AMQP.newProducer(connection,
ProducerParameters(Some(exchangeParameters), Some("my-producer")))
producer ! Message("data".getBytes, "some.routing.key")
producer ! Message("data".getBytes, "some.routing.key")
producer ! Message("data".getBytes, "some.routing.key")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment