Skip to content

Instantly share code, notes, and snippets.

@lemonxah
Created August 2, 2016 09:20
Show Gist options
  • Save lemonxah/8e8e75558036d8419fb9458e55d46689 to your computer and use it in GitHub Desktop.
Save lemonxah/8e8e75558036d8419fb9458e55d46689 to your computer and use it in GitHub Desktop.
package com.fullfacing.queue
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
import java.util.zip._
import akka.actor.ActorRef
import com.fullfacing.framework.Security
import com.fullfacing.message.{Response, Request, Retry}
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client._
import org.slf4j.LoggerFactory
import scala.Console._
import scala.Predef
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language._
import scala.util.Try
import scala.util.{Success ⇒ S}
import scala.util.{Failure ⇒ F}
import scalaz._, Scalaz._
/**
* Project: common
* Package: com.fullfacing.queue
* Created on 20/8/15.
* lemonxah aka lemonxah -
* https://github.com/lemonxah
* http://stackoverflow.com/users/2919672/lemon-xah
*/
sealed trait Compression
object Compression {
def apply(s: String): Compression = s.trim.toLowerCase match {
case "gzip" ⇒ Gzip
case "deflate" ⇒ Deflate
case _ ⇒ NoCompression
}
}
case object NoCompression extends Compression
case object Gzip extends Compression
case object Deflate extends Compression
object RabbitMQ {
val logger = LoggerFactory.getLogger("com.fullfacing.queue.RabbitMQ")
type SubscribeReader = Reader[Consumer[Connection], Unit]
type PublishReader = Reader[Producer[Channel, Connection], Unit]
def fib(n : Long) : Long = {
@tailrec def fib_tail(n: Long, a: Long, b: Long): Long = n match {
case 0 ⇒ a
case _ ⇒ fib_tail(n-1, b, a + b)
}
fib_tail( n, 0, 1)
}
private implicit val backOffstrat: Long ⇒ Duration = i ⇒ fib(Math.min(9L, i)) seconds
private val cache = new ConcurrentHashMap[String, (Array[Byte], String) ⇒ Throwable \/ _]()
private val connections = new ConcurrentHashMap[String, Option[Connection]]
private val chans = new ConcurrentHashMap[String, Option[Channel]]
private def connect(conf: Properties): Throwable \/ Connection = \/.fromTryCatchNonFatal {
logger.debug(s"${Console.GREEN}Connecting to :${Console.BLUE} ${getKey(conf)}${Console.RESET}")
val factory = new ConnectionFactory
factory.setRequestedHeartbeat(conf.getProperty("host.heartBeat", "60").toInt)
factory.setVirtualHost(conf.getProperty("host.virtualHost", "/"))
factory.setHost(conf.getProperty("host.ip", ""))
factory.setPort(conf.getProperty("host.port", "5672").toInt)
factory.setAutomaticRecoveryEnabled(true)
factory.setNetworkRecoveryInterval(10000)
factory.setConnectionTimeout(60000)
if (conf.containsKey("host.username")) factory.setUsername(conf.getProperty("host.username"))
if (conf.containsKey("host.password")) factory.setPassword(conf.getProperty("host.password"))
val ssl = conf.getProperty("host.useSSL", "false").toBoolean
logger.info(s"${Console.GREEN}Using RabbitMQ SSL${Console.WHITE}: ${Console.BLUE}$ssl${Console.RESET}")
if (ssl) {
factory.useSslProtocol(conf.getProperty("host.sslProtocol", "TLSv1.2"))
}
factory.newConnection()
}
private def getKey(conf: Properties): String = s"${conf.getProperty("host.ip")}:${conf.getProperty("host.port")}"
private def getCon(conf: Properties): Option[Connection] = {
if (connections.containsKey(getKey(conf))) connections.get(getKey(conf)) else None
}
private def getChan(conf: Properties): Option[Channel] = {
if (chans.containsKey(getKey(conf))) chans.get(getKey(conf)) else None
}
implicit def confToConnection(conf: Properties, retries: Retry): Connection = {
if (retries.count > 0) logger.trace(s"${Console.GREEN}Trying to connect(${getKey(conf)}), tries: ${retries.count}${Console.RESET}")
getCon(conf).getOrElse {
chans.put(getKey(conf), None)
connections.put(getKey(conf), None)
connect(conf).fold({ ex ⇒
logger.error(s"${Console.RED}Connection Error: (${getKey(conf)}), Retry no: ${retries.count}, message: ${ex.getMessage}${Console.RESET}")
retries.backOff
confToConnection(conf, retries.incr)
}, { connection ⇒
connections.put(getKey(conf), Some(connection))
confToConnection(conf, Retry(0))
})
}
}
implicit def confToChannel(conf: Properties): Channel =
getChan(conf).getOrElse {
getCon(conf).fold {
confToConnection(conf, Retry(0))
confToChannel(conf)
} { c ⇒
val ch = c.createChannel()
chans.put(getKey(conf), Some(ch))
confToChannel(conf)
}
}
implicit val publisher: Publisher[Channel, Connection] = new Publisher[Channel, Connection] {
override def publish(a: Publish): Reader[Producer[Channel, Connection], Unit] = new PublishReader({ p ⇒
val props = new BasicProperties.Builder()
val useCompressed = p.props.getProperty("topics.compressed", "false").toBoolean
if (useCompressed && a.message.length > 300) props.contentEncoding("gzip")
props.contentType("application/json; charset=utf-8")
props.appId(p.props.getProperty("app.name", "no app name in config!"))
p.producer.basicPublish(p.props.getProperty("exchange.name"), a.topic, props.build(),
if (useCompressed && a.message.length > 300) gzip(a.message) else a.message)
logger.trace(s"${BLUE}Published message on topic($YELLOW${a.topic}$BLUE)$WHITE:$GREEN${new String(a.message)}$RESET")
})
}
private def consumer(chan: Channel) = new DefaultConsumer(chan) {
override def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException): Unit = {
logger.error(s"${Console.RED}Rabbit consumer shutdown tag($consumerTag)${Console.WHITE}: ${Console.RED} ${Console.RESET}", sig)
}
def ack(deliveryTag: Long) = {
this.getChannel.basicAck(deliveryTag, false)
}
def nack(deliveryTag: Long) = {
this.getChannel.basicNack(deliveryTag, false, true)
}
def handleMessage(b: Array[Byte], envelope: Envelope)(implicit handle: (Array[Byte], String) ⇒ Throwable \/ _) = {
handle(b, envelope.getRoutingKey).fold({ t ⇒
logger.error(s"${RED}Queue data not in expected format$RESET", t)
nack(envelope.getDeliveryTag)
}, {d ⇒
logger.trace(s"${BLUE}Got message on topic($YELLOW${envelope.getRoutingKey}$BLUE)$WHITE:$GREEN $d$RESET")
ack(envelope.getDeliveryTag)
})
}
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]): Unit = {
try {
val contentEncoding = Option(properties.getContentEncoding)
val contentType = Option(properties.getContentType)
val compression: Compression = contentEncoding.map(Compression.apply).getOrElse(NoCompression)
logger.trace(s"${BLUE}Content encoding for message($YELLOW${envelope.getDeliveryTag}$BLUE)$WHITE:$GREEN $contentEncoding${Console.RESET}")
logger.trace(s"${BLUE}Content type for message($YELLOW${envelope.getDeliveryTag}$BLUE)$WHITE:$GREEN $contentType${Console.RESET}")
logger.trace(s"${BLUE}Compression for message($YELLOW${envelope.getDeliveryTag}$BLUE)$WHITE:$GREEN $compression${Console.RESET}")
def failedDecompress(t: Throwable) =
logger.error(s"${Console.RED}Decompression failed: $compression($YELLOW${envelope.getDeliveryTag}$RED)${Console.RESET}", t)
if (cache.containsKey(envelope.getRoutingKey) || cache.containsKey("#")) {
implicit val handle = if (cache.containsKey(envelope.getRoutingKey)) cache.get(envelope.getRoutingKey) else cache.get("#")
uzip(body, compression).fold(failedDecompress, handleMessage(_, envelope))
} else {
logger.trace(s"${Console.RED}Dropping message on topic not listening directry for: ${Console.BLUE}${envelope.getRoutingKey}${Console.RESET}")
ack(envelope.getDeliveryTag)
}
} catch {
case e: Exception ⇒
logger.error(s"${Console.RED}MessageDelivery error: ${e.getMessage}${Console.RESET}", e)
ack(envelope.getDeliveryTag)
// deadletter publishing to exchange FF topic Dead Letter
// drop dead letters for now else we would duplicate the dead letter messages by the number of
// queues that are subscribed to that routing key.
}
}
}
private def subscr[M[_]](a: List[Subscribe[_, _, M]], s: Consumer[Connection], chan: Channel, qName: String): Unit = {
val exchange = s.props.getProperty("exchange.name")
a.foreach(t ⇒ cache.putIfAbsent(t.topic, t.handle))
val (fullfacing, other) = a.map(_.topic).partition(s ⇒ s.split('.').length == 4 && s.startsWith("fullfacing"))
(fullfacing.map(_.split('.').toList)
.groupBy(_ (1))
.flatMap { case (k, v) ⇒
val org = v.head.head
if (v.size < 2) {
List(s"$org.$k.${v.head(2)}.${v.head(3)}")
} else if (v.forall(_ (3) == "request")) {
List(s"$org.$k.*.request")
} else if (v.forall(_ (3) == "response")) {
List(s"$org.$k.*.response")
} else {
v.groupBy(_ (2)).map {
case (k1, v1) ⇒ if (v1.size > 1) s"$org.$k.$k1.#" else s"$org.$k.$k1.${v1.head(3)}"
}
}
} ++ other).foreach { topic ⇒
logger.info(s"${Console.GREEN}Binding queue('${Console.RED}$qName${Console.GREEN}') to ${Console.WHITE}: ${Console.BLUE}$topic${Console.RESET}")
chan.queueBind(qName, exchange, topic)
}
chan.basicConsume(qName, false, consumer(chan))
}
def uzip(body: Array[Byte], compression: Compression): Throwable \/ Array[Byte] = \/.fromTryCatchNonFatal {
compression match {
case Gzip ⇒ ungzip(body)
case Deflate ⇒ decompress(body)
case NoCompression ⇒ body
}
}
private[RabbitMQ] def compress(inData: Array[Byte]): Array[Byte] = {
val deflater = new Deflater()
deflater.setInput(inData)
deflater.finish()
val compressedData = new Array[Byte](inData.length * 2) // compressed data can be larger than original data
val count: Int = deflater.deflate(compressedData)
compressedData.take(count)
}
private[RabbitMQ] def decompress(inData: Array[Byte]): Array[Byte] = {
val inflater = new Inflater()
inflater.setInput(inData)
val decompressedData = new Array[Byte](inData.length * 2)
@tailrec def loop(count: Int, finalData: Array[Byte]): Array[Byte] = {
if (count <= 0) {
finalData
}
else {
val c = inflater.inflate(decompressedData)
loop(c, finalData ++ decompressedData.take(c))
}
}
val count = inflater.inflate(decompressedData)
loop(count, decompressedData.take(count))
}
private[RabbitMQ] def gzip(bytes: Array[Byte]): Array[Byte] = {
val buffer = new Array[Byte](1024 * 4)
val byteIn = new ByteArrayInputStream(bytes)
val byteOut = new ByteArrayOutputStream()
val gzipOut = new GZIPOutputStream(byteOut)
@tailrec def loop(n: Int): Unit = {
if (n != -1) {
gzipOut.write(buffer, 0, n)
loop(byteIn.read(buffer))
}
}
loop(byteIn.read(buffer))
gzipOut.close()
byteOut.toByteArray
}
private[RabbitMQ] def ungzip(bytes: Array[Byte]): Array[Byte] = {
val buffer = new Array[Byte](1024 * 4)
val byteIn = new ByteArrayInputStream(bytes)
val gzipIn = new GZIPInputStream(byteIn)
val byteOut = new ByteArrayOutputStream()
@tailrec def loop(n: Int): Unit = {
if (n != -1) {
byteOut.write(buffer, 0, n)
loop(gzipIn.read(buffer))
}
}
loop(gzipIn.read(buffer))
byteOut.toByteArray
}
def flatReader[M[_]](a: List[Subscribe[_, _, M]]): SubscribeReader = new SubscribeReader({ s ⇒
val c = s.connection.createChannel
val qName = s.props.getProperty("app.name", "NoAppName")
c.queueDeclare(qName, true, false, false, null)
subscr(a, s, c, qName)
})
implicit val subscirberRequests: Subscriber[SubscribeRequest[_, _], Connection] = new Subscriber[SubscribeRequest[_, _], Connection] {
override def subscribe(a: List[SubscribeRequest[_, _]]): Reader[Consumer[Connection], Unit] = flatReader(a)
}
implicit val subscirberFlat: Subscriber[SubscribeFlat[_, _], Connection] = new Subscriber[SubscribeFlat[_, _], Connection] {
override def subscribe(a: List[SubscribeFlat[_, _]]): Reader[Consumer[Connection], Unit] = flatReader(a)
}
implicit val subscirberBroadcasts: Subscriber[SubscribeBroadcast[_, _], Connection] = new Subscriber[SubscribeBroadcast[_, _], Connection] {
override def subscribe(a: List[SubscribeBroadcast[_, _]]): Reader[Consumer[Connection], Unit] = new SubscribeReader({ s ⇒
val c = s.connection.createChannel
val qName = c.queueDeclare.getQueue
subscr(a, s, c, qName)
})
}
implicit val subscirberResponses: Subscriber[SubscribeResponse[_, _], Connection] = new Subscriber[SubscribeResponse[_, _], Connection] {
override def subscribe(a: List[SubscribeResponse[_, _]]): Reader[Consumer[Connection], Unit] = new SubscribeReader({ s ⇒
val c = s.connection.createChannel
val qName = c.queueDeclare.getQueue
subscr(a, s, c, qName)
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment