Last active
February 4, 2016 20:26
-
-
Save ith/aa3e90a2337ce3cdef90 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException | |
import scala.collection.JavaConversions._ | |
import scala.collection.mutable | |
import scala.concurrent.duration._ | |
import scala.util.control.Exception._ | |
import akka.actor.{ActorLogging, Cancellable, PoisonPill} | |
import com.google.inject.Inject | |
import com.rabbitmq.client.AMQP.BasicProperties | |
import com.rabbitmq.client.{Channel, Envelope => AMPQEnvelope, ShutdownListener, ShutdownSignalException} | |
abstract class Consumer @Inject()(channel: Channel, queueName: String, exchangeName: String) | |
extends DefaultActorConsumer(channel) with ActorLogging with ActorExecutionContext { | |
implicit object DeliveryTagOrdering extends Ordering[DeliveryTag] { | |
override def compare(x: DeliveryTag, y: DeliveryTag) = x.wrapped.compare(y.wrapped) | |
} | |
private case class MessageContext(tag: DeliveryTag, expiration: Cancellable, message: Delivery) | |
val toxicMessageExchangeName = queueName + "-toxic" | |
val toxicMessageQueueName = queueName + "-toxic" | |
val toxicMessageRetryAfter = 1 minute | |
val maxRetriesForToxicMessage = 10 | |
private[this] val pendingConfirmation = mutable.Map[MessageId, MessageContext]() | |
private[this] def startConsuming() = { | |
log.debug("Consumer starts consuming") | |
channel.exchangeDeclare(exchangeName, "topic", true) | |
channel.queueDeclare(queueName, true, false, false, Map.empty[String, Object]) | |
channel.queueBind(queueName, exchangeName, "#") | |
channel.exchangeDeclare(toxicMessageExchangeName, "topic", false) | |
channel.queueDeclare(toxicMessageQueueName, true, false, false, | |
Map[String, Object]( | |
"x-message-ttl" -> toxicMessageRetryAfter.toMillis.asInstanceOf[Object], | |
"x-dead-letter-exchange" -> exchangeName)) | |
channel.queueBind(toxicMessageQueueName, toxicMessageExchangeName, "#") | |
channel.basicConsume(queueName, false, this) | |
() | |
} | |
private[this] def pullDeliveryTag(forId: MessageId) = | |
pendingConfirmation.remove(forId).map { msgContext => | |
msgContext.expiration.cancel() | |
log.debug(s"PullDeliveryTag resolved id=$forId to tag=${msgContext.tag}") | |
msgContext.tag | |
} | |
private[this] def waitForConfirmation(messageId: MessageId, deliveryTag: DeliveryTag, message: Delivery) = { | |
import scala.language.postfixOps | |
pendingConfirmation.put(messageId, | |
MessageContext( | |
deliveryTag, | |
context.system.scheduler.scheduleOnce(2 minutes, self, ExpiredMessage(messageId)), | |
message)) | |
} | |
def retryLater(id: MessageId) = | |
pendingConfirmation.get(id).foreach { msgContext => | |
msgContext.expiration.cancel() | |
if (msgContext.message.properties.retryNo().value < maxRetriesForToxicMessage) { | |
channel.basicPublish( | |
toxicMessageExchangeName, | |
"#", | |
msgContext.message.properties.withBumpedRetryNo(), | |
msgContext.message.body) | |
} | |
nack(id, requeue = false) | |
} | |
def nack(forId: MessageId, requeue: Boolean) = { | |
log.warning(s"Nack forId=$forId and requeue=$requeue") | |
catching(classOf[IOException]).either( | |
pullDeliveryTag(forId).foreach(channel.basicNack(_, false, requeue))) | |
.fold(e => log.error("Failed to nack", e), _ => ()) | |
() | |
} | |
def ack(forId: MessageId) = { | |
log.info(s"Ack forId=$forId") | |
catching(classOf[IOException]) | |
.either(pullDeliveryTag(forId).foreach(channel.basicAck(_, false))) | |
.fold(e => log.error("Failed to ack", e), _ => ()) | |
} | |
def failAll() = | |
Some(pendingConfirmation.values.map(_.tag).toList) | |
.filter(_.nonEmpty) | |
.map(_.max) | |
.foreach(dt => catching(classOf[IOException]) | |
.apply(channel.basicNack(dt, true, true))) | |
def terminate() = { | |
log.warning("Consumer terminating, sorry") | |
failAll() | |
pendingConfirmation.clear() | |
self ! PoisonPill | |
} | |
channel.addShutdownListener(new ShutdownListener { | |
override def shutdownCompleted(cause: ShutdownSignalException) = terminate() | |
}) | |
def handleMessage( | |
consumerTag: String, | |
envelope: AMPQEnvelope, | |
properties: BasicProperties, | |
body: Array[Byte]) | |
override def receive = { | |
case Start() => | |
startConsuming() | |
case ProcessedMessage(id) => | |
log.debug(s"The message id=$id processed and will be acknowledged") | |
ack(id) | |
case FailedMessage(id) => | |
log.warning(s"The message id=$id failed and will be queued to retry later") | |
retryLater(id) | |
case RejectedMessage(id) => | |
log.warning(s"The message id=$id was rejected and will be NOT queued to retry later") | |
nack(id, requeue = false) | |
case ExpiredMessage(id) => | |
log.warning(s"The message id=$id expired and will be queued to retry later") | |
retryLater(id) | |
case delivery @ Delivery(consumerTag, envelope, properties, body) if properties.getMessageId != null => | |
waitForConfirmation(MessageId(properties.getMessageId), envelope.getDeliveryTag, delivery) | |
handleMessage(consumerTag, envelope, properties, body) | |
case delivery @ Delivery(_, envelope, _, _) => | |
log.error(s"Reject message without MessageId, deliveryTag=${envelope.getDeliveryTag}") | |
channel.basicNack(envelope.getDeliveryTag, false, false) | |
case msg => | |
log.info(s"Received unsupported message, ignoring $msg") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment