Skip to content

Instantly share code, notes, and snippets.

@florianverhein
Last active August 29, 2015 14:15
Show Gist options
  • Save florianverhein/b644f0e2ae2edadf94a9 to your computer and use it in GitHub Desktop.
Save florianverhein/b644f0e2ae2edadf94a9 to your computer and use it in GitHub Desktop.
Running scalaz-stream Processor inside Spark example
import org.apache.spark._
import scalaz.stream._
/**
* Simple proof of concept - fill an RDD from files that have been
* processed by a scalaz-stream Process (in parallel).
*/
object SparkScalazStream {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark scalaz-stream test")
val spark = new SparkContext(conf)
val files = spark.parallelize(args.toSeq, args.length)
val contents = files.flatMap { case f =>
// assuming f exists on every node. would really read from HDFS...
val in = scalaz.stream.io.linesR(f)
val p = in //actually, some really complicated stream
//processing of in that relies on order, etc
p.runLog //TODO MUST AVOID THIS!!!!
.run
}
val lines = contents.map(_ => 1).reduce(_ + _)
println("lines = " + lines)
spark.stop()
}
}
/*
* TODO Solve this problem:
* turn p ( a Process[Task,String] ) into a TraversableOnce[String]
* and let spark drive the state machine, rather than the Task
*/
@florianverhein
Copy link
Author

Since Iterator is a TraversableOnce, attempted this:
https://gist.github.com/florianverhein/2ed965bde7324cb73325

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment