package botkop.sparti.receiver
import com.rabbitmq.client._
import org.apache.spark.Logging
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import scala.reflect.ClassTag
class RabbitInputDStream[T: ClassTag](
ssc_ : StreamingContext,
uri: String,
exchangeName: String,
queueName: String,
routingKey: String,
converter: Array[Byte] => T,
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {
def getReceiver(): Receiver[T] = {
new RabbitReceiver(uri, exchangeName, queueName, routingKey, converter, storageLevel)
class RabbitReceiver[T: ClassTag](uri: String,
exchangeName: String,
queueName: String,
routingKey: String,
bytesToObject: Array[Byte] => T,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2 )
extends Receiver[T](storageLevel) with Logging {
def onStart() = {
new Thread("Rabbit Receiver") {
override def run() { receive() }
def onStop() = {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
private def receive() = {
var connection: Connection = null
try {
connection = getConnection
val channel = connection.createChannel
// channel.exchangeDeclare(exchangeName, "direct", true) // assuming exchange has been declared
channel.queueDeclare(queueName, true, false, false, null)
channel.queueBind(queueName, exchangeName, routingKey)
val autoAck = false
val consumer = new MessageConsumer(channel, convertAndStore)
channel.basicConsume(queueName, autoAck, getClass.getCanonicalName, consumer)
} catch {
case t: Throwable =>
if (connection != null) {
try {
} catch {
case u: Throwable => logError("error trying to close connection to rabbit", u)
restart("error receiving data from rabbit", t)
def convertAndStore(body: Array[Byte]) = {
val o: T = bytesToObject(body)
private def getConnection: Connection = {
val factory = new ConnectionFactory
factory.setAutomaticRecoveryEnabled(true) //enable automatic connection recovery
class MessageConsumer(channel: Channel, store: (Array[Byte]) => Unit) extends DefaultConsumer(channel) {
override def handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: Array[Byte] ) = {
val deliveryTag = envelope.getDeliveryTag
// process the message
channel.basicAck(deliveryTag, false)
object RabbitReceiver {
def rabbitTextStream(
ssc : StreamingContext, exchangeName: String, queueName: String,
routingKey: String = "#",
uri: String = "amqp://guest:guest@localhost:5672/%2f",
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 )
: RabbitInputDStream[String] =
new RabbitInputDStream[String](ssc, uri, exchangeName, queueName, routingKey, new String(_), storageLevel)
