Skip to content

Instantly share code, notes, and snippets.

@krishnanraman
Last active August 29, 2015 14:04
Show Gist options
  • Save krishnanraman/5c8b182264dae6051ea1 to your computer and use it in GitHub Desktop.
Save krishnanraman/5c8b182264dae6051ea1 to your computer and use it in GitHub Desktop.
ExecutionContext
import com.twitter.scalding.typed.MemorySink
import com.twitter.scalding._
import com.twitter.scalding.ExecutionContext._
import com.twitter.algebird.monad._
class A(args:Args)extends ExecutionContextJob(args) {
override def job: Reader[ExecutionContext, Nothing] = {
val (r, stats) = Execution.waitFor(Config.default, Local(false)) { implicit ec: ExecutionContext =>
val sink = new MemorySink[(Int, Int)]
TypedPipe.from(0 to 10000)
.map { k => (k % 3, k) }
.sumByKey
.write(sink)(flowDefFromContext, modeFromContext)
{ () => sink.readResults }
}
println(stats)
println(r)
// how do I see the results in the sink ?
ReaderFn(ec=>{ println("exiting"); sys.exit(1)} )
}
}
$ scald.rb --hdfs-local A.scala
compiling A.scala
scalac -classpath /Users/kraman/.sbt/boot/scala-2.9.3/lib/scala-library.jar:/Users/kraman/.sbt/boot/scala-2.9.3/lib/scala-compiler.jar:/Users/kraman/workspace/scalding/scalding-core/target/scala-2.9.3/scalding-core-assembly-0.11.1.jar:/var/folders/b_/17q0nsss269_2kf855mtg4_c0000gn/T/maven/hadoop-core-1.1.2.jar:/var/folders/b_/17q0nsss269_2kf855mtg4_c0000gn/T/maven/commons-codec-1.8.jar:/var/folders/b_/17q0nsss269_2kf855mtg4_c0000gn/T/maven/commons-configuration-1.9.jar:/var/folders/b_/17q0nsss269_2kf855mtg4_c0000gn/T/maven/jackson-asl-0.9.5.jar:/var/folders/b_/17q0nsss269_2kf855mtg4_c0000gn/T/maven/jackson-mapper-asl-1.9.13.jar:/var/folders/b_/17q0nsss269_2kf855mtg4_c0000gn/T/maven/commons-lang-2.6.jar:/var/folders/b_/17q0nsss269_2kf855mtg4_c0000gn/T/maven/slf4j-log4j12-1.6.6.jar:/var/folders/b_/17q0nsss269_2kf855mtg4_c0000gn/T/maven/log4j-1.2.15.jar:/var/folders/b_/17q0nsss269_2kf855mtg4_c0000gn/T/maven/commons-httpclient-3.1.jar:/var/folders/b_/17q0nsss269_2kf855mtg4_c0000gn/T/maven/commons-cli-1.2.jar:/var/folders/b_/17q0nsss269_2kf855mtg4_c0000gn/T/maven/commons-logging-1.1.1.jar:/var/folders/b_/17q0nsss269_2kf855mtg4_c0000gn/T/maven/zookeeper-3.3.4.jar -d /var/folders/b_/17q0nsss269_2kf855mtg4_c0000gn/T/script-build A.scala
sh: ./sbt: No such file or directory
14/07/31 16:18:49 INFO property.AppProps: using app.id: 90D0729745544DE98AFBE3B40A5715EA
14/07/31 16:18:49 INFO util.Version: Concurrent, Inc - Cascading 2.5.5
14/07/31 16:18:49 INFO flow.Flow: [] starting
14/07/31 16:18:49 INFO flow.Flow: [] source: MemoryTap["NullScheme"]["0.24429457602845317"]
14/07/31 16:18:49 INFO flow.Flow: [] sink: MemoryTap["NullScheme"]["0.7135861445713797"]
14/07/31 16:18:49 INFO flow.Flow: [] parallel execution is enabled: true
14/07/31 16:18:49 INFO flow.Flow: [] starting jobs: 1
14/07/31 16:18:49 INFO flow.Flow: [] allocating threads: 1
14/07/31 16:18:49 INFO flow.FlowStep: [] starting step: local
Success(JobStats(Map(duration -> 608, name -> null, finished_time -> 1406848730044, flow_step_stats -> ArrayBuffer(Map(duration -> 603, name -> local, finished_time -> 1406848730044, start_time -> 1406848729441, counters -> Map(cascading.flow.SliceCounters -> Map(Tuples_Written -> 3, Tuples_Read -> 10001), cascading.flow.StepCounters -> Map(Tuples_Written -> 3, Tuples_Read -> 10001)), failed -> false, skipped -> false, id -> 62662B0A3FF94D47A3009448D2E1B163, successful -> true, stopped -> false, run_time -> 1406848729441, submit_time -> 1406848729441)), start_time -> 1406848729436, counters -> Map(cascading.flow.SliceCounters -> Map(Tuples_Read -> 10001, Tuples_Written -> 3), cascading.flow.StepCounters -> Map(Tuples_Read -> 10001, Tuples_Written -> 3)), failed -> false, skipped -> false, id -> 8D44E9E618244BF6AEF74338702DD4A8, successful -> true, stopped -> false, run_time -> 1406848729441, submit_time -> 1406848729441)))
<function0>
exiting
[tw-mbp13-kraman test]$
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment