Created
October 21, 2015 08:52
-
-
Save durre/358457c140d6e6de62c5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorLogging | |
import akka.stream.actor.ActorPublisher | |
import akka.util.ByteString | |
import com.rabbitmq.client._ | |
class RabbitConsumerActor(connection: Connection, queue: Queue) extends ActorPublisher[RabbitMessage] with ActorLogging { | |
val channel = connection.createChannel() | |
val consumer = new DefaultConsumer(channel) { | |
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]) = | |
self ! new RabbitMessage(envelope.getDeliveryTag, ByteString(body), channel) | |
} | |
def register(channel: Channel, queue: Queue, consumer: Consumer): Unit = { | |
val autoAck = false | |
channel.basicConsume(queue.name, autoAck, consumer) | |
} | |
register(channel, queue, consumer) | |
override def receive = { | |
case msg: RabbitMessage => | |
if (isActive && totalDemand > 0) { | |
onNext(msg) | |
} else { | |
msg.nack() | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object WorkerApp extends App { | |
implicit val actorSystem = ActorSystem("feed") | |
val conf = ConfigFactory.load() | |
val connectionHolder = new RabbitConnectionHolder(conf.getString("rabbitmq.url")) | |
connectionHolder.createConnection().map { connection => | |
setupRabbitMQInfrastructure(connectionHolder) | |
object FeedFlow extends FeedFlow { | |
override def fanOutService: FanOutService = RuntimeServices.FanOutService | |
} | |
implicit val mat = ActorFlowMaterializer() | |
val rabbitConsumer = ActorPublisher[RabbitMessage](actorSystem.actorOf(Props(new RabbitConsumerActor(connection, RabbitMQConfig.activityQueue)))) | |
val source = Source(rabbitConsumer) | |
val process = source via FeedFlow.processFeedActivity | |
process.runWith(Sink.ignore) | |
} | |
private def setupRabbitMQInfrastructure(connection: RabbitConnectionHolder): Unit = { | |
connection.declareExchange(RabbitMQConfig.activityExchange) | |
connection.declareQueue(RabbitMQConfig.activityQueue) | |
connection.bindQueue(RabbitMQConfig.activityBinding) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment