Skip to content

Instantly share code, notes, and snippets.

@koen-dejonghe
Last active March 1, 2016 17:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save koen-dejonghe/940e0d5f3fc6c2b2d39c to your computer and use it in GitHub Desktop.
Save koen-dejonghe/940e0d5f3fc6c2b2d39c to your computer and use it in GitHub Desktop.
package botkop.sparti.receiver
import com.rabbitmq.client._
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import scala.reflect.ClassTag
class RabbitInputDStream[T: ClassTag](
ssc_ : StreamingContext,
uri: String,
exchangeName: String,
queueName: String,
routingKey: String,
converter: Array[Byte] => T,
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {
def getReceiver(): Receiver[T] = {
new RabbitReceiver(uri, exchangeName, queueName, routingKey, converter, storageLevel)
}
}
class RabbitReceiver[T: ClassTag](uri: String,
exchangeName: String,
queueName: String,
routingKey: String,
bytesToObject: Array[Byte] => T,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_2 )
extends Receiver[T](storageLevel) with Logging {
def onStart() = {
new Thread("Rabbit Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
def onStop() = {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
private def receive() = {
var connection: Connection = null
try {
connection = getConnection
val channel = connection.createChannel
// channel.exchangeDeclare(exchangeName, "direct", true) // assuming exchange has been declared
channel.queueDeclare(queueName, true, false, false, null)
channel.queueBind(queueName, exchangeName, routingKey)
val autoAck = false
val consumer = new MessageConsumer(channel, convertAndStore)
channel.basicConsume(queueName, autoAck, getClass.getCanonicalName, consumer)
} catch {
case t: Throwable =>
if (connection != null) {
try {
connection.close()
} catch {
case u: Throwable => logError("error trying to close connection to rabbit", u)
}
}
restart("error receiving data from rabbit", t)
}
}
def convertAndStore(body: Array[Byte]) = {
val o: T = bytesToObject(body)
store(o)
}
private def getConnection: Connection = {
val factory = new ConnectionFactory
factory.setUri(uri)
factory.setAutomaticRecoveryEnabled(true) //enable automatic connection recovery
factory.newConnection
}
}
class MessageConsumer(channel: Channel, store: (Array[Byte]) => Unit) extends DefaultConsumer(channel) {
override def handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: Array[Byte] ) = {
val deliveryTag = envelope.getDeliveryTag
// process the message
store(body)
channel.basicAck(deliveryTag, false)
}
}
object RabbitReceiver {
def rabbitTextStream(
ssc : StreamingContext, exchangeName: String, queueName: String,
routingKey: String = "#",
uri: String = "amqp://guest:guest@localhost:5672/%2f",
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 )
: RabbitInputDStream[String] =
new RabbitInputDStream[String](ssc, uri, exchangeName, queueName, routingKey, new String(_), storageLevel)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment