Skip to content

Instantly share code, notes, and snippets.

@elmer-garduno
Created September 6, 2013 03:01
Show Gist options
  • Save elmer-garduno/6459000 to your computer and use it in GitHub Desktop.
Save elmer-garduno/6459000 to your computer and use it in GitHub Desktop.
RabbitMQ Actor with Receiver
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.streaming.receivers
import spark.streaming.receivers._
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.Channel
import com.rabbitmq.client.DefaultConsumer
import com.rabbitmq.client.Envelope
import com.rabbitmq.client.AMQP.BasicProperties
import com.thenewmotion.akka.rabbitmq._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import spark.Logging
/**
* A receiver to subscribe to RabbitMQ stream.
*/
class RMQReceiver[T: ClassManifest](uname: String, passwd: String, vhost: String, host: String,
qname: String, topic: String )
extends Actor with Receiver with Logging {
implicit val system = ActorSystem()
val factory = new ConnectionFactory()
factory.setUsername(uname)
factory.setPassword(passwd)
factory.setVirtualHost(vhost)
factory.setHost(host)
override def preStart() = {
val connectionActor = system.actorOf(Props(new ConnectionActor(factory)))
def setupChannel(channel: Channel) {
val result = channel.queueDeclare()
val queue = result.getQueue()
channel.queueBind(queue, qname, topic)
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties,
body: Array[Byte]) {
pushBlock(fromBytes(body))
}
}
channel.basicConsume(queue, true, consumer)
}
val channelActor: ActorRef = connectionActor.createChannel(Props(new ChannelActor(setupChannel)))
}
def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")
def receive: Receive = {
case _ ⇒ logInfo("unknown mwssage")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment