Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Examples of how (and how not) to call Scalding's ExecutionApp
import com.twitter.scalding.typed.TypedPipe
import com.twitter.scalding._
import scala.util.{Failure, Success}
// If you want to understand what's going on, read the code of Execution and ExecutionApp (it's not long)
// Here's the highlights: The main of ExecutionApp - executes job for you, as long as you return Execution[Unit]
// Execution: Consider flatMap, zip, unit
// unit: since you have to return Execution[Unit] at some point, this is handy. Nice with .zip, for example
// zip: combine Executions to execute in parallel for fun and profit
// flatMap: start w/ Execution[T], get inside Execution to do some stuff w/ T and return Execution[Whatever] (unit is nice!)
object TestExecutionApp extends ExecutionApp{
override def job = Execution.getArgs.flatMap { case args =>
val outFile = args("output")
val pipe = TypedPipe.from(List("A", "B", "C"))
pipe.writeExecution(TypedTsv(outFile)) // That's all we need! Returns Execution[Unit]
}
}
object TestExecutionApp2 extends ExecutionApp{
override def job = Execution.getArgs.flatMap { case args =>
val outFile = args("output")
val pipe = TypedPipe.from(List("A", "B", "C"))
val wrappedPipe = Execution.from(pipe)
wrappedPipe.flatMap { p=> p.writeExecution(TypedTsv(outFile)) }
}
}
object TestExecutionApp2TheWrongWay extends ExecutionApp{
override def job = Execution.getArgs.flatMap { case args =>
val outFile = args("output")
val pipe = TypedPipe.from(List("A", "B", "C"))
val wrappedPipe = Execution.from(pipe) // Uh oh! Our pipe is already wrapped, and we want to write it
val badCode = wrappedPipe.onComplete {
case Success(s) =>
println("This will get called when your execution gets called")
s.writeExecution(TypedTsv("doesntMatterWontGetCalled.tsv")) // This returns a Execution[Unit] that won't get called
case Failure(e) => println("oops")
}
badCode.unit // This Execution[Unit] will successfully get called, but the writeExecution Execution[Unit] didn't get used anywhere
}
}
object TestExecutionApp3 extends ExecutionApp{
override def job = Execution.getArgs.flatMap { case args =>
val outFile = args("output")
val outFile2 = args("output2")
val pipe = TypedPipe.from(List("A", "B", "C"))
val pipe2 = TypedPipe.from(List(1, 2, 3))
val e1 = pipe.writeExecution(TypedTsv(outFile))
val e2 = pipe2.writeExecution(TypedTsv(outFile2))
e1.zip(e2).unit
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.