Skip to content

Instantly share code, notes, and snippets.

@ith
Last active February 4, 2016 20:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ith/aa3e90a2337ce3cdef90 to your computer and use it in GitHub Desktop.
Save ith/aa3e90a2337ce3cdef90 to your computer and use it in GitHub Desktop.
import java.io.IOException
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.concurrent.duration._
import scala.util.control.Exception._
import akka.actor.{ActorLogging, Cancellable, PoisonPill}
import com.google.inject.Inject
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.{Channel, Envelope => AMPQEnvelope, ShutdownListener, ShutdownSignalException}
abstract class Consumer @Inject()(channel: Channel, queueName: String, exchangeName: String)
extends DefaultActorConsumer(channel) with ActorLogging with ActorExecutionContext {
implicit object DeliveryTagOrdering extends Ordering[DeliveryTag] {
override def compare(x: DeliveryTag, y: DeliveryTag) = x.wrapped.compare(y.wrapped)
}
private case class MessageContext(tag: DeliveryTag, expiration: Cancellable, message: Delivery)
val toxicMessageExchangeName = queueName + "-toxic"
val toxicMessageQueueName = queueName + "-toxic"
val toxicMessageRetryAfter = 1 minute
val maxRetriesForToxicMessage = 10
private[this] val pendingConfirmation = mutable.Map[MessageId, MessageContext]()
private[this] def startConsuming() = {
log.debug("Consumer starts consuming")
channel.exchangeDeclare(exchangeName, "topic", true)
channel.queueDeclare(queueName, true, false, false, Map.empty[String, Object])
channel.queueBind(queueName, exchangeName, "#")
channel.exchangeDeclare(toxicMessageExchangeName, "topic", false)
channel.queueDeclare(toxicMessageQueueName, true, false, false,
Map[String, Object](
"x-message-ttl" -> toxicMessageRetryAfter.toMillis.asInstanceOf[Object],
"x-dead-letter-exchange" -> exchangeName))
channel.queueBind(toxicMessageQueueName, toxicMessageExchangeName, "#")
channel.basicConsume(queueName, false, this)
()
}
private[this] def pullDeliveryTag(forId: MessageId) =
pendingConfirmation.remove(forId).map { msgContext =>
msgContext.expiration.cancel()
log.debug(s"PullDeliveryTag resolved id=$forId to tag=${msgContext.tag}")
msgContext.tag
}
private[this] def waitForConfirmation(messageId: MessageId, deliveryTag: DeliveryTag, message: Delivery) = {
import scala.language.postfixOps
pendingConfirmation.put(messageId,
MessageContext(
deliveryTag,
context.system.scheduler.scheduleOnce(2 minutes, self, ExpiredMessage(messageId)),
message))
}
def retryLater(id: MessageId) =
pendingConfirmation.get(id).foreach { msgContext =>
msgContext.expiration.cancel()
if (msgContext.message.properties.retryNo().value < maxRetriesForToxicMessage) {
channel.basicPublish(
toxicMessageExchangeName,
"#",
msgContext.message.properties.withBumpedRetryNo(),
msgContext.message.body)
}
nack(id, requeue = false)
}
def nack(forId: MessageId, requeue: Boolean) = {
log.warning(s"Nack forId=$forId and requeue=$requeue")
catching(classOf[IOException]).either(
pullDeliveryTag(forId).foreach(channel.basicNack(_, false, requeue)))
.fold(e => log.error("Failed to nack", e), _ => ())
()
}
def ack(forId: MessageId) = {
log.info(s"Ack forId=$forId")
catching(classOf[IOException])
.either(pullDeliveryTag(forId).foreach(channel.basicAck(_, false)))
.fold(e => log.error("Failed to ack", e), _ => ())
}
def failAll() =
Some(pendingConfirmation.values.map(_.tag).toList)
.filter(_.nonEmpty)
.map(_.max)
.foreach(dt => catching(classOf[IOException])
.apply(channel.basicNack(dt, true, true)))
def terminate() = {
log.warning("Consumer terminating, sorry")
failAll()
pendingConfirmation.clear()
self ! PoisonPill
}
channel.addShutdownListener(new ShutdownListener {
override def shutdownCompleted(cause: ShutdownSignalException) = terminate()
})
def handleMessage(
consumerTag: String,
envelope: AMPQEnvelope,
properties: BasicProperties,
body: Array[Byte])
override def receive = {
case Start() =>
startConsuming()
case ProcessedMessage(id) =>
log.debug(s"The message id=$id processed and will be acknowledged")
ack(id)
case FailedMessage(id) =>
log.warning(s"The message id=$id failed and will be queued to retry later")
retryLater(id)
case RejectedMessage(id) =>
log.warning(s"The message id=$id was rejected and will be NOT queued to retry later")
nack(id, requeue = false)
case ExpiredMessage(id) =>
log.warning(s"The message id=$id expired and will be queued to retry later")
retryLater(id)
case delivery @ Delivery(consumerTag, envelope, properties, body) if properties.getMessageId != null =>
waitForConfirmation(MessageId(properties.getMessageId), envelope.getDeliveryTag, delivery)
handleMessage(consumerTag, envelope, properties, body)
case delivery @ Delivery(_, envelope, _, _) =>
log.error(s"Reject message without MessageId, deliveryTag=${envelope.getDeliveryTag}")
channel.basicNack(envelope.getDeliveryTag, false, false)
case msg =>
log.info(s"Received unsupported message, ignoring $msg")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment