Skip to content

Instantly share code, notes, and snippets.

@durre
Created October 21, 2015 08:52
Show Gist options
  • Save durre/358457c140d6e6de62c5 to your computer and use it in GitHub Desktop.
Save durre/358457c140d6e6de62c5 to your computer and use it in GitHub Desktop.
import akka.actor.ActorLogging
import akka.stream.actor.ActorPublisher
import akka.util.ByteString
import com.rabbitmq.client._
class RabbitConsumerActor(connection: Connection, queue: Queue) extends ActorPublisher[RabbitMessage] with ActorLogging {
val channel = connection.createChannel()
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]) =
self ! new RabbitMessage(envelope.getDeliveryTag, ByteString(body), channel)
}
def register(channel: Channel, queue: Queue, consumer: Consumer): Unit = {
val autoAck = false
channel.basicConsume(queue.name, autoAck, consumer)
}
register(channel, queue, consumer)
override def receive = {
case msg: RabbitMessage =>
if (isActive && totalDemand > 0) {
onNext(msg)
} else {
msg.nack()
}
}
}
object WorkerApp extends App {
implicit val actorSystem = ActorSystem("feed")
val conf = ConfigFactory.load()
val connectionHolder = new RabbitConnectionHolder(conf.getString("rabbitmq.url"))
connectionHolder.createConnection().map { connection =>
setupRabbitMQInfrastructure(connectionHolder)
object FeedFlow extends FeedFlow {
override def fanOutService: FanOutService = RuntimeServices.FanOutService
}
implicit val mat = ActorFlowMaterializer()
val rabbitConsumer = ActorPublisher[RabbitMessage](actorSystem.actorOf(Props(new RabbitConsumerActor(connection, RabbitMQConfig.activityQueue))))
val source = Source(rabbitConsumer)
val process = source via FeedFlow.processFeedActivity
process.runWith(Sink.ignore)
}
private def setupRabbitMQInfrastructure(connection: RabbitConnectionHolder): Unit = {
connection.declareExchange(RabbitMQConfig.activityExchange)
connection.declareQueue(RabbitMQConfig.activityQueue)
connection.bindQueue(RabbitMQConfig.activityBinding)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment