Skip to content

Instantly share code, notes, and snippets.

@juanrh
Last active August 29, 2015 14:24
Show Gist options
  • Save juanrh/139af20fd2060cb1a9d1 to your computer and use it in GitHub Desktop.
Save juanrh/139af20fd2060cb1a9d1 to your computer and use it in GitHub Desktop.
import org.apache.spark._
import org.apache.spark.streaming.{StreamingContext, Duration}
import scala.reflect._
import org.apache.spark.streaming.receiver.ActorHelper
import akka.actor.{Actor, Props}
class ReceiverActorFoo[A:ClassTag]
extends Actor with ActorHelper {
override def receive = {
case msg : A => store[A](msg)
}
}
object ReceiverActorDemo extends App {
val conf = new SparkConf().setMaster("local[5]").setAppName("ReceiverActorDemo")
val sc = new SparkContext(conf)
val batchDuration = Duration(100)
val ssc = new StreamingContext(sc, batchDuration)
// get reference to receiver actor so we can send messages to it
val driverHost = sc.getConf.get("spark.driver.host")
val driverPort = sc.getConf.get("spark.driver.port")
val receiverActorName = "ReceiverActorFoo"
val actorSystem = SparkEnv.get.actorSystem
val actorUrl = s"akka.tcp://sparkDriver@$driverHost:$driverPort/user/Supervisor0/$receiverActorName"
val receiverActor = actorSystem.actorSelection(actorUrl)
val inputDStream = ssc.actorStream(Props(new ReceiverActorFoo[String]), receiverActorName)
inputDStream. print
ssc.start
Thread.sleep(500)
for (msg <- "hola caracola que tal lo llevas yo aqui intentando usar actores con Spark Streaming q esta crudo".split("""\s+""")) {
println(s"sending message [${msg}] to receiver actor $receiverActor")
receiverActor ! msg
Thread.sleep(50)
}
ssc.stop(stopSparkContext=true, stopGracefully = true)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment