Created
February 25, 2015 09:01
-
-
Save kingster/cb37eff1ed986933f214 to your computer and use it in GitHub Desktop.
HttpPipeline Spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.flipkart.marketing.bro.mouth.pipeline.http | |
import akka.actor.{ActorRef, Actor, Props} | |
import akka.serialization.Serialization | |
import com.flipkart.marketing.bro.commons.data.models.RawEvent | |
import com.flipkart.marketing.bro.mouth.akka.AkkaEngine | |
import com.flipkart.marketing.bro.mouth.pipeline.StreamSource | |
import org.apache.spark.streaming.StreamingContext | |
import org.apache.spark.streaming.dstream.InputDStream | |
import org.apache.spark.streaming.receiver.ActorHelper | |
import scala.collection.mutable.LinkedList | |
import scala.reflect.ClassTag | |
/** | |
* Created by kinshuk.bairagi on 24/02/15. | |
*/ | |
class HttpPipeline(ssc: StreamingContext) extends java.io.Serializable with StreamSource { | |
override def getStream(): InputDStream[RawEvent] = { | |
val httpStreamFeederRef = AkkaEngine.getActor(classOf[HTTPStreamFeeder].getName) | |
val actorPath: String = Serialization.serializedActorPath(httpStreamFeederRef) | |
println("actor Addrss = " + actorPath) | |
val lines = ssc.actorStream[RawEvent](Props(new HTTPStreamActor[RawEvent](actorPath)), "HTTPStreamActor") | |
lines | |
} | |
} | |
sealed case class SubscribeReceiver(receiverActor: ActorRef) | |
sealed case class UnsubscribeReceiver(receiverActor: ActorRef) | |
class HTTPStreamActor[T: ClassTag](urlOfPublisher: String) extends Actor with ActorHelper { | |
lazy private val remotePublisher = context.actorSelection(urlOfPublisher) | |
override def preStart() = remotePublisher ! SubscribeReceiver(context.self) | |
def receive = { | |
case msg => store(msg.asInstanceOf[T]) | |
} | |
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) | |
} | |
class HTTPStreamFeeder extends Actor { | |
var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]() | |
def receive: Receive = { | |
case SubscribeReceiver(receiverActor: ActorRef) => | |
println("received subscribe from %s".format(receiverActor.toString)) | |
receivers = LinkedList(receiverActor) ++ receivers | |
case UnsubscribeReceiver(receiverActor: ActorRef) => | |
println("received unsubscribe from %s".format(receiverActor.toString)) | |
receivers = receivers.dropWhile(x => x eq receiverActor) | |
case msg: RawEvent => | |
println("Sending rawEvent to SparkActor") | |
receivers.foreach(_ ! msg) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment