Skip to content

Instantly share code, notes, and snippets.

@kingster
Created February 25, 2015 09:01
Show Gist options
  • Save kingster/cb37eff1ed986933f214 to your computer and use it in GitHub Desktop.
Save kingster/cb37eff1ed986933f214 to your computer and use it in GitHub Desktop.
HttpPipeline Spark
package com.flipkart.marketing.bro.mouth.pipeline.http
import akka.actor.{ActorRef, Actor, Props}
import akka.serialization.Serialization
import com.flipkart.marketing.bro.commons.data.models.RawEvent
import com.flipkart.marketing.bro.mouth.akka.AkkaEngine
import com.flipkart.marketing.bro.mouth.pipeline.StreamSource
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.receiver.ActorHelper
import scala.collection.mutable.LinkedList
import scala.reflect.ClassTag
/**
* Created by kinshuk.bairagi on 24/02/15.
*/
class HttpPipeline(ssc: StreamingContext) extends java.io.Serializable with StreamSource {
override def getStream(): InputDStream[RawEvent] = {
val httpStreamFeederRef = AkkaEngine.getActor(classOf[HTTPStreamFeeder].getName)
val actorPath: String = Serialization.serializedActorPath(httpStreamFeederRef)
println("actor Addrss = " + actorPath)
val lines = ssc.actorStream[RawEvent](Props(new HTTPStreamActor[RawEvent](actorPath)), "HTTPStreamActor")
lines
}
}
sealed case class SubscribeReceiver(receiverActor: ActorRef)
sealed case class UnsubscribeReceiver(receiverActor: ActorRef)
class HTTPStreamActor[T: ClassTag](urlOfPublisher: String) extends Actor with ActorHelper {
lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
override def preStart() = remotePublisher ! SubscribeReceiver(context.self)
def receive = {
case msg => store(msg.asInstanceOf[T])
}
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
}
class HTTPStreamFeeder extends Actor {
var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()
def receive: Receive = {
case SubscribeReceiver(receiverActor: ActorRef) =>
println("received subscribe from %s".format(receiverActor.toString))
receivers = LinkedList(receiverActor) ++ receivers
case UnsubscribeReceiver(receiverActor: ActorRef) =>
println("received unsubscribe from %s".format(receiverActor.toString))
receivers = receivers.dropWhile(x => x eq receiverActor)
case msg: RawEvent =>
println("Sending rawEvent to SparkActor")
receivers.foreach(_ ! msg)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment