Skip to content

Instantly share code, notes, and snippets.

@florianverhein
Last active August 29, 2015 14:15
Show Gist options
  • Save florianverhein/b644f0e2ae2edadf94a9 to your computer and use it in GitHub Desktop.
Save florianverhein/b644f0e2ae2edadf94a9 to your computer and use it in GitHub Desktop.
Running scalaz-stream Processor inside Spark example
import org.apache.spark._
import scalaz.stream._
/**
* Simple proof of concept - fill an RDD from files that have been
* processed by a scalaz-stream Process (in parallel).
*/
object SparkScalazStream {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark scalaz-stream test")
val spark = new SparkContext(conf)
val files = spark.parallelize(args.toSeq, args.length)
val contents = files.flatMap { case f =>
// assuming f exists on every node. would really read from HDFS...
val in = scalaz.stream.io.linesR(f)
val p = in //actually, some really complicated stream
//processing of in that relies on order, etc
p.runLog //TODO MUST AVOID THIS!!!!
.run
}
val lines = contents.map(_ => 1).reduce(_ + _)
println("lines = " + lines)
spark.stop()
}
}
/*
* TODO Solve this problem:
* turn p ( a Process[Task,String] ) into a TraversableOnce[String]
* and let spark drive the state machine, rather than the Task
*/
@florianverhein
Copy link
Author

The above works, but is not efficient:

  • runLog collects the result in memory.
  • run runs the Task (which drives the Process... so is another thread)
    There should be a better way...

@pchlupacek
Copy link

hm, I think you can achieve whatever conversionyou want with process. You can either use Process.toTask or Process.step to lazily convert process to whatever primitive you want. But I would not recommend it. I think we have to hook to spark differently. Instead of files.flatMap, don't we have different combinator? Additionally, you can easily parallelize runing processes by using njoin or merge.mergeN from processes.

@florianverhein
Copy link
Author

Thanks @pchlupacek. Would you mind elaborating?
I thought about implementing an iterator that steps through the Process on each next() and returns the emitted value somehow... but unsure of the details... I think this is what you meant with Process.step?
Why would you not recommend this?

I need to parallelise beyond a single host due to data size, so running Processes within spark seems a natural solution (I have a library of these and would like to lift them into spark - and later, I would also like to process data in RDDs with scalaz Processes via mapPartition). I don't know much about njoin or mergeN beyond reading the API just now, but I think these would be limited to a single host.

@florianverhein
Copy link
Author

Since Iterator is a TraversableOnce, attempted this:
https://gist.github.com/florianverhein/2ed965bde7324cb73325

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment