-
-
Save florianverhein/b644f0e2ae2edadf94a9 to your computer and use it in GitHub Desktop.
import org.apache.spark._ | |
import scalaz.stream._ | |
/** | |
* Simple proof of concept - fill an RDD from files that have been | |
* processed by a scalaz-stream Process (in parallel). | |
*/ | |
object SparkScalazStream { | |
def main(args: Array[String]) { | |
val conf = new SparkConf().setAppName("Spark scalaz-stream test") | |
val spark = new SparkContext(conf) | |
val files = spark.parallelize(args.toSeq, args.length) | |
val contents = files.flatMap { case f => | |
// assuming f exists on every node. would really read from HDFS... | |
val in = scalaz.stream.io.linesR(f) | |
val p = in //actually, some really complicated stream | |
//processing of in that relies on order, etc | |
p.runLog //TODO MUST AVOID THIS!!!! | |
.run | |
} | |
val lines = contents.map(_ => 1).reduce(_ + _) | |
println("lines = " + lines) | |
spark.stop() | |
} | |
} | |
/* | |
* TODO Solve this problem: | |
* turn p ( a Process[Task,String] ) into a TraversableOnce[String] | |
* and let spark drive the state machine, rather than the Task | |
*/ |
Thanks @pchlupacek. Would you mind elaborating?
I thought about implementing an iterator that steps through the Process on each next()
and returns the emitted value somehow... but unsure of the details... I think this is what you meant with Process.step
?
Why would you not recommend this?
I need to parallelise beyond a single host due to data size, so running Process
es within spark seems a natural solution (I have a library of these and would like to lift them into spark - and later, I would also like to process data in RDDs with scalaz Process
es via mapPartition). I don't know much about njoin or mergeN beyond reading the API just now, but I think these would be limited to a single host.
Since Iterator is a TraversableOnce, attempted this:
https://gist.github.com/florianverhein/2ed965bde7324cb73325
hm, I think you can achieve whatever conversionyou want with process. You can either use Process.toTask or Process.step to lazily convert process to whatever primitive you want. But I would not recommend it. I think we have to hook to spark differently. Instead of files.flatMap, don't we have different combinator? Additionally, you can easily parallelize runing processes by using njoin or merge.mergeN from processes.