-
-
Save mcfunley/5675983 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff -ur twitter-scalding/README.md etsy-scalding/README.md | |
--- twitter-scalding/README.md 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/README.md 2013-05-29 23:08:00.000000000 -0700 | |
@@ -4,7 +4,7 @@ | |
Scalding is built on top of [Cascading](http://www.cascading.org/), a Java library that abstracts away much of the complexity of Hadoop. | |
-Current version: 0.8.0 | |
+Current version: 0.8.1 | |
## Word Count | |
@@ -51,7 +51,7 @@ | |
We use [Travis CI](http://travis-ci.org/) to verify the build: | |
[![Build Status](https://secure.travis-ci.org/twitter/scalding.png)](http://travis-ci.org/twitter/scalding) | |
-The current version is 0.8.0 and is available from maven central: org="com.twitter", artifact="scalding_2.9.2". | |
+The current version is 0.8.1 and is available from maven central: org="com.twitter", artifact="scalding_2.9.2". | |
## Contact | |
Only in etsy-scalding: TRACING.md | |
diff -ur twitter-scalding/build.sbt etsy-scalding/build.sbt | |
--- twitter-scalding/build.sbt 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/build.sbt 2013-05-29 23:08:00.000000000 -0700 | |
@@ -2,7 +2,7 @@ | |
name := "scalding" | |
-version := "0.8.1" | |
+version := "0.8.2-SNAPSHOT" | |
organization := "com.twitter" | |
@@ -12,6 +12,8 @@ | |
resolvers += "Concurrent Maven Repo" at "http://conjars.org/repo" | |
+ | |
+ | |
// Use ScalaCheck | |
resolvers ++= Seq( | |
"snapshots" at "http://oss.sonatype.org/content/repositories/snapshots", | |
@@ -20,18 +22,33 @@ | |
//resolvers += "Twitter Artifactory" at "http://artifactory.local.twitter.com/libs-releases-local" | |
+// Uncomment and adjust this to use the local Etsy filesystem repository | |
+//etsyFSRepoPath := Some("/Users/mwalker/development/Ivy/repository") | |
+ | |
+// This optionally adds the Etsy filesystem resolver | |
+resolvers <<= (resolvers, etsyFSRepoPath) { (rs, p) => | |
+ optionalEtsyResolver("etsy-fs-repo-resolver", p) | |
+ .map(rs :+ _) | |
+ .getOrElse(rs) | |
+} | |
+ | |
+// This optionally points the publish target at the Etsy filesystem resolver | |
+publishTo <<= (etsyFSRepoPath) { p => optionalEtsyResolver("etsy-fs-repo-publish", p) } | |
+ | |
libraryDependencies += "cascading" % "cascading-core" % "2.0.2" | |
libraryDependencies += "cascading" % "cascading-local" % "2.0.2" | |
libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.2" | |
-libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.4.5" | |
- | |
-libraryDependencies += "com.twitter" % "meat-locker" % "0.3.1" | |
+libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.4.6" | |
libraryDependencies += "com.twitter" % "maple" % "0.2.4" | |
+libraryDependencies += "com.twitter" % "algebird-core_2.9.2" % "0.1.12" | |
+ | |
+libraryDependencies += "com.twitter" % "chill_2.9.2" % "0.2.0" | |
+ | |
libraryDependencies += "commons-lang" % "commons-lang" % "2.4" | |
libraryDependencies += "org.scala-tools.testing" % "specs_2.8.1" % "1.6.6" % "test" | |
@@ -40,6 +57,8 @@ | |
libraryDependencies += "io.backchat.jerkson" % "jerkson_2.9.2" % "0.7.0" | |
+libraryDependencies += "com.googlecode.javaewah" % "JavaEWAH" % "0.6.6" | |
+ | |
libraryDependencies ++= Seq( | |
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test", | |
"org.scala-tools.testing" % "specs_2.9.0-1" % "1.6.8" % "test" | |
diff -ur twitter-scalding/project/Build.scala etsy-scalding/project/Build.scala | |
--- twitter-scalding/project/Build.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/project/Build.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -1,7 +1,49 @@ | |
import sbt._ | |
object ScaldingBuild extends Build { | |
- lazy val root = Project("root", file(".")) | |
- .dependsOn(RootProject(uri("git://github.com/twitter/algebird.git#master"))) | |
- .dependsOn(RootProject(uri("git://github.com/twitter/chill.git#master"))) | |
+ /** | |
+ * An optional path to a local filesystem repository to add to the list of | |
+ * resolvers. This defaults to None, in which case it is not used. To set a | |
+ * path, place: | |
+ * | |
+ * etsyFSRepoPath := "your/path/here" | |
+ * | |
+ * in build.sbt or use set in the sbt console: | |
+ * | |
+ * > set etsyFSRepoPath := Some("/Users/mwalker/development/Ivy/repository") | |
+ * [info] Defining *:etsy-fs-repo-path | |
+ * [info] The new value will be used by *:publish-to, *:resolvers | |
+ * [info] Reapplying settings... | |
+ * [info] Set current project to scalding (in build file:/Users/mwalker/development/mrwalker/scalding/) | |
+ */ | |
+ val etsyFSRepoPath = SettingKey[Option[String]]( | |
+ "etsy-fs-repo-path", | |
+ "Path to the local Etsy filesystem repository" | |
+ ) | |
+ | |
+ lazy val root = Project( | |
+ "root", | |
+ file("."), | |
+ settings = Project.defaultSettings ++ Seq(etsyFSRepoPath := None) | |
+ ) | |
+ | |
+ /** | |
+ * Optionally create a resolver configured for the Etsy repository. | |
+ */ | |
+ def optionalEtsyResolver(name: String, path: Option[String]): Option[sbt.Resolver] = path.map(p => | |
+ Resolver.file(name, file(p))( | |
+ Patterns( | |
+ Seq( | |
+ "[organisation]/[module]/[revision]/ivy-[revision].xml", | |
+ "[organisation]/[module]/[revision]/ivys/ivy.xml" | |
+ ), | |
+ Seq( | |
+ "[organisation]/[module]/[revision]/[type]s/[artifact]-[revision].[ext]", | |
+ "[organisation]/[module]/[revision]/[type]s/[artifact].[ext]", | |
+ "[organisation]/[module]/[revision]/[type]s/[artifact]-[classifier].[ext]" | |
+ ), | |
+ false | |
+ ) | |
+ ) | |
+ ) | |
} | |
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/Args.scala etsy-scalding/src/main/scala/com/twitter/scalding/Args.scala | |
--- twitter-scalding/src/main/scala/com/twitter/scalding/Args.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/main/scala/com/twitter/scalding/Args.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -27,6 +27,7 @@ | |
def apply(argString : String) : Args = Args(argString.split("\\s+")) | |
/** | |
* parses keys as starting with a dash, except single dashed digits. | |
+ * Also parses key value pairs that are separated with an equal sign. | |
* All following non-dashed args are a list of values. | |
* If the list starts with non-dashed args, these are associated with the | |
* empty string: "" | |
@@ -39,7 +40,10 @@ | |
.filter{ a => !a.matches("\\s*") } | |
.foldLeft(List("" -> List[String]())) { (acc, arg) => | |
val noDashes = arg.dropWhile{ _ == '-'} | |
- if(arg == noDashes || isNumber(arg)) | |
+ if (arg.contains("=")) { | |
+ val splitArg = arg.split("=") | |
+ (splitArg(0) -> List(splitArg(1))) :: acc | |
+ } else if(arg == noDashes || isNumber(arg)) | |
(acc.head._1 -> (arg :: acc.head._2)) :: acc.tail | |
else | |
(noDashes -> List()) :: acc | |
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/FileSource.scala etsy-scalding/src/main/scala/com/twitter/scalding/FileSource.scala | |
--- twitter-scalding/src/main/scala/com/twitter/scalding/FileSource.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/main/scala/com/twitter/scalding/FileSource.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -68,7 +68,7 @@ | |
override def createTap(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = { | |
mode match { | |
// TODO support strict in Local | |
- case Local(_) => { | |
+ case Local(_,_) => { | |
val sinkmode = readOrWrite match { | |
case Read => SinkMode.KEEP | |
case Write => SinkMode.REPLACE | |
@@ -175,11 +175,13 @@ | |
val separator = "\t" | |
val skipHeader = false | |
val writeHeader = false | |
+ val quote : String = null | |
//These should not be changed: | |
- override def localScheme = new CLTextDelimited(fields, skipHeader, writeHeader, separator, types) | |
+ override def localScheme = new CLTextDelimited(fields, skipHeader, writeHeader, separator, quote, types) | |
- override def hdfsScheme = | |
- HadoopSchemeInstance(new CHTextDelimited(fields, skipHeader, writeHeader, separator, types)) | |
+ override def hdfsScheme = { | |
+ HadoopSchemeInstance(new CHTextDelimited(fields, skipHeader, writeHeader, separator, quote, types)) | |
+ } | |
} | |
trait SequenceFileScheme extends Source { | |
@@ -236,6 +238,17 @@ | |
override val skipHeader : Boolean = false, override val writeHeader: Boolean = false) extends FixedPathSource(p) | |
with DelimitedScheme | |
+/** | |
+* Csv value source | |
+* separated by commas and quotes wrapping all fields | |
+*/ | |
+case class Csv(p : String, | |
+ override val separator : String = ",", | |
+ override val fields : Fields = Fields.ALL, | |
+ override val skipHeader : Boolean = false, | |
+ override val writeHeader : Boolean = false, | |
+ override val quote : String ="\"") extends FixedPathSource(p) with DelimitedScheme | |
+ | |
/** Allows you to set the types, prefer this: | |
* If T is a subclass of Product, we assume it is a tuple. If it is not, wrap T in a Tuple1: | |
* e.g. TypedTsv[Tuple1[List[Int]]] | |
@@ -262,13 +275,13 @@ | |
// For Mappable: | |
override def mapTo[U](out : Fields)(fun : (T) => U) | |
- (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U]) = { | |
- RichPipe(read(flowDef, mode)).mapTo[T,U](sourceFields -> out)(fun)(converter, setter) | |
+ (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U], tracing : Tracing) = { | |
+ RichPipe(read(flowDef, mode, tracing)).mapTo[T,U](sourceFields -> out)(fun)(converter, setter, tracing) | |
} | |
// For Mappable: | |
override def flatMapTo[U](out : Fields)(fun : (T) => Iterable[U]) | |
- (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U]) = { | |
- RichPipe(read(flowDef, mode)).flatMapTo[T,U](sourceFields -> out)(fun)(converter, setter) | |
+ (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U], tracing : Tracing) = { | |
+ RichPipe(read(flowDef, mode, tracing)).flatMapTo[T,U](sourceFields -> out)(fun)(converter, setter, tracing) | |
} | |
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/GroupBuilder.scala etsy-scalding/src/main/scala/com/twitter/scalding/GroupBuilder.scala | |
--- twitter-scalding/src/main/scala/com/twitter/scalding/GroupBuilder.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/main/scala/com/twitter/scalding/GroupBuilder.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -317,6 +317,12 @@ | |
* of group operations similar to `RichPipe.then` | |
*/ | |
def then(fn : (GroupBuilder) => GroupBuilder) = fn(this) | |
+ | |
+ /** | |
+ * An identity function that keeps all the tuples. A hack to implement | |
+ * groupAll and groupRandomly. | |
+ */ | |
+ def pass : GroupBuilder = takeWhile(0) { (t: TupleEntry) => true } | |
} | |
/** | |
Only in etsy-scalding/src/main/scala/com/twitter/scalding: InputTracingJob.scala | |
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/IterableSource.scala etsy-scalding/src/main/scala/com/twitter/scalding/IterableSource.scala | |
--- twitter-scalding/src/main/scala/com/twitter/scalding/IterableSource.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/main/scala/com/twitter/scalding/IterableSource.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -72,7 +72,7 @@ | |
sys.error("IterableSource is a Read-only Source") | |
} | |
mode match { | |
- case Local(_) => new MemoryTap[InputStream,OutputStream](localScheme, asBuffer) | |
+ case Local(_,_) => new MemoryTap[InputStream,OutputStream](localScheme, asBuffer) | |
case Test(_) => new MemoryTap[InputStream,OutputStream](localScheme, asBuffer) | |
case Hdfs(_, _) => hdfsTap | |
case HadoopTest(_,_) => hdfsTap | |
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/Job.scala etsy-scalding/src/main/scala/com/twitter/scalding/Job.scala | |
--- twitter-scalding/src/main/scala/com/twitter/scalding/Job.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/main/scala/com/twitter/scalding/Job.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -142,7 +142,7 @@ | |
//Largely for the benefit of Java jobs | |
implicit def read(src : Source) : Pipe = src.read | |
- def write(pipe : Pipe, src : Source) {src.write(pipe)} | |
+ def write(pipe : Pipe, src : Source) {src.writeFrom(pipe)} | |
def validateSources(mode : Mode) { | |
flowDef.getSources() | |
@@ -175,8 +175,19 @@ | |
case Some(tzn) => java.util.TimeZone.getTimeZone(tzn) | |
case None => defaultTimeZone | |
} | |
+ | |
+ // Optionally take a --period, which determines how many days each job runs over (rather | |
+ // than over the whole date range) | |
+ // --daily and --weekly are aliases for --period 1 and --period 7 respectively | |
+ val period = | |
+ if (args.boolean("daily")) | |
+ 1 | |
+ else if (args.boolean("weekly")) | |
+ 7 | |
+ else | |
+ args.getOrElse("period", "0").toInt | |
- implicit lazy val dateRange = { | |
+ lazy val (startDate, endDate) = { | |
val (start, end) = args.list("date") match { | |
case List(s, e) => (RichDate(s), RichDate.upperBound(e)) | |
case List(o) => (RichDate(o), RichDate.upperBound(o)) | |
@@ -184,8 +195,21 @@ | |
} | |
//Make sure the end is not before the beginning: | |
assert(start <= end, "end of date range must occur after the start") | |
- DateRange(start, end) | |
+ (start, end) | |
} | |
+ | |
+ implicit lazy val dateRange = DateRange(startDate, if (period > 0) startDate + Days(period) - Millisecs(1) else endDate) | |
+ | |
+ override def next : Option[Job] = | |
+ if (period > 0) { | |
+ val nextStartDate = startDate + Days(period) | |
+ if (nextStartDate + Days(period - 1) > endDate) | |
+ None // we're done | |
+ else // return a new job with the new startDate | |
+ Some(clone(args + ("date" -> List(nextStartDate.toString("yyyy-MM-dd"), endDate.toString("yyyy-MM-dd"))))) | |
+ } | |
+ else | |
+ None | |
} | |
// DefaultDateRangeJob with default time zone as UTC instead of Pacific. | |
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala etsy-scalding/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala | |
--- twitter-scalding/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -1,4 +1,5 @@ | |
/* | |
+ | |
Copyright 2012 Twitter, Inc. | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
@@ -27,6 +28,9 @@ | |
import cascading.tuple._ | |
import cascading.cascade._ | |
+import com.twitter.algebird._ | |
+import com.twitter.algebird.Operators._ | |
+ | |
import scala.util.Random | |
/* | |
@@ -116,6 +120,29 @@ | |
} | |
} | |
+ def prepareJoin(p : Pipe, left : Boolean)(implicit tracing : Tracing) : Pipe = { | |
+ tracing.beforeJoin(p, left) | |
+ } | |
+ | |
+ def finishJoin(p : Pipe)(implicit tracing : Tracing) : Pipe = { | |
+ tracing.afterJoin(p) | |
+ } | |
+ | |
+ def filterAndJoinWithSmaller(fs : (Fields, Fields), that : Pipe, small_pipe_size : Int = 10000, false_pos_rate : Double = 0.1) : Pipe = { | |
+ val tracing : Tracing = Tracing.tracing | |
+ Tracing.tracing = new NullTracing | |
+ // Make a bloom filter of the smaller pipe contents. | |
+ implicit val bfm : BloomFilterMonoid = BloomFilter(small_pipe_size, false_pos_rate) | |
+ val bfp = that.mapTo(fs._2 -> '__bf){ x : TupleEntry => bfm.create(x.getTuple.toString) } | |
+ .groupAll{ _.plus[BF]('__bf -> '__bf) } | |
+ val fp = pipe.map(fs._1 -> '__str){ x : TupleEntry => x.getTuple.toString } | |
+ .crossWithTiny(bfp) | |
+ .filter('__str, '__bf) { x : (String, BF) => x._2.contains(x._1) != ApproximateBoolean.exactFalse } | |
+ .discard('__str, '__bf) | |
+ Tracing.tracing = tracing | |
+ fp.joinWithSmaller(fs, that) | |
+ } | |
+ | |
/** | |
* Joins the first set of keys in the first pipe to the second set of keys in the second pipe. | |
* All keys must be unique UNLESS it is an inner join, then duplicated join keys are allowed, but | |
@@ -136,10 +163,10 @@ | |
val intersection = asSet(fs._1).intersect(asSet(fs._2)) | |
if (intersection.size == 0) { | |
// Common case: no intersection in names: just CoGroup, which duplicates the grouping fields: | |
- pipe.coGroupBy(fs._1, joiners._1) { | |
- _.coGroup(fs._2, that, joiners._2) | |
+ finishJoin(prepareJoin(pipe, false).coGroupBy(fs._1, joiners._1) { | |
+ _.coGroup(fs._2, prepareJoin(that, true), joiners._2) | |
.reducers(reducers) | |
- } | |
+ }) | |
} | |
else if (joiners._1 == InnerJoinMode && joiners._2 == InnerJoinMode) { | |
/* | |
@@ -148,10 +175,10 @@ | |
* So, we rename the right hand side to temporary names, then discard them after the operation | |
*/ | |
val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection) | |
- pipe.coGroupBy(fs._1, joiners._1) { | |
- _.coGroup(newJoinFields, renamedThat, joiners._2) | |
+ finishJoin(prepareJoin(pipe,false).coGroupBy(fs._1, joiners._1) { | |
+ _.coGroup(newJoinFields, prepareJoin(renamedThat, true), joiners._2) | |
.reducers(reducers) | |
- }.discard(temp) | |
+ }.discard(temp)) | |
} | |
else { | |
throw new IllegalArgumentException("join keys must be disjoint unless you are doing an InnerJoin. Found: " + | |
@@ -186,19 +213,20 @@ | |
*/ | |
def joinWithTiny(fs :(Fields,Fields), that : Pipe) = { | |
val intersection = asSet(fs._1).intersect(asSet(fs._2)) | |
+ val pt = prepareJoin(that, true) | |
if (intersection.size == 0) { | |
- new HashJoin(assignName(pipe), fs._1, assignName(that), fs._2, new InnerJoin) | |
+ finishJoin(new HashJoin(assignName(prepareJoin(pipe, false)), fs._1, assignName(pt), fs._2, new InnerJoin)) | |
} | |
else { | |
- val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection) | |
- (new HashJoin(assignName(pipe), fs._1, assignName(renamedThat), newJoinFields, new InnerJoin)) | |
- .discard(temp) | |
+ val (renamedThat, newJoinFields, temp) = renameCollidingFields(pt, fs._2, intersection) | |
+ finishJoin((new HashJoin(assignName(prepareJoin(pipe, false)), fs._1, assignName(renamedThat), newJoinFields, new InnerJoin)) | |
+ .discard(temp)) | |
} | |
} | |
def leftJoinWithTiny(fs :(Fields,Fields), that : Pipe) = { | |
//Rename these pipes to avoid cascading name conflicts | |
- new HashJoin(assignName(pipe), fs._1, assignName(that), fs._2, new LeftJoin) | |
+ finishJoin(new HashJoin(assignName(prepareJoin(pipe, false)), fs._1, assignName(prepareJoin(that, true)), fs._2, new LeftJoin)) | |
} | |
/** | |
@@ -240,31 +268,60 @@ | |
val leftFields = new Fields("__LEFT_I__", "__LEFT_J__") | |
val rightFields = new Fields("__RIGHT_I__", "__RIGHT_J__") | |
- // Add the new dummy fields | |
- val newLeft = addDummyFields(pipe, leftFields, rightReplication, leftReplication) | |
- val newRight = addDummyFields(otherPipe, rightFields, leftReplication, rightReplication, swap = true) | |
+ // Add the new dummy replication fields | |
+ val newLeft = addReplicationFields(prepareJoin(pipe, false), leftFields, leftReplication, rightReplication) | |
+ val newRight = addReplicationFields(prepareJoin(otherPipe, true), rightFields, rightReplication, leftReplication, swap = true) | |
val leftJoinFields = Fields.join(fs._1, leftFields) | |
val rightJoinFields = Fields.join(fs._2, rightFields) | |
- newLeft | |
+ finishJoin(newLeft | |
.joinWithSmaller((leftJoinFields, rightJoinFields), newRight, joiner, reducers) | |
.discard(leftFields) | |
- .discard(rightFields) | |
+ .discard(rightFields)) | |
} | |
/** | |
* Adds one random field and one replica field. | |
*/ | |
- private def addDummyFields(p : Pipe, f : Fields, k1 : Int, k2 : Int, swap : Boolean = false) : Pipe = { | |
- p.flatMap(() -> f) { u : Unit => | |
- val i = if(k1 == 1 ) 0 else (new Random()).nextInt(k1 - 1) | |
- (0 until k2).map{ j => | |
- if(swap) (j, i) else (i, j) | |
- } | |
+ private def addReplicationFields(p : Pipe, f : Fields, | |
+ replication : Int, otherReplication : Int, swap : Boolean = false) : Pipe = { | |
+ | |
+ p.using(new Random with Stateful).flatMap(() -> f) { (rand : Random, _ : Unit) => | |
+ val rfs = getReplicationFields(rand, replication, otherReplication) | |
+ if (swap) rfs.map { case(i, j) => (j, i) } else rfs | |
} | |
} | |
+ /** | |
+ * Returns a list of the dummy replication fields used to replicate groups in skewed joins. | |
+ * | |
+ * For example, suppose you have two pipes P1 and P2. While performing a skewed join for a particular | |
+ * key K, you want to replicate every row in P1 with this key 3 times, and every row in P2 with this | |
+ * key 5 times. | |
+ * | |
+ * Then: | |
+ * | |
+ * - For the P1 replication, the first element of each tuple is the same random integer between 0 and 4, | |
+ * and the second element of each tuple is the index of the replication (between 0 and 2). This first | |
+ * random element guarantees that we will match exactly one random row in P2 with the same key. | |
+ * - Similarly for the P2 replication. | |
+ * | |
+ * Examples: | |
+ * | |
+ * getReplicationFields(3, 5) | |
+ * => List( (1, 0), (1, 1), (1, 2) ) | |
+ * | |
+ * getReplicationFields(5, 3) | |
+ * => List( (2, 0), (2, 1), (2, 2), (2, 3), (2, 4) ) | |
+ */ | |
+ private def getReplicationFields(r : Random, replication : Int, otherReplication : Int) : IndexedSeq[(Int, Int)] = { | |
+ assert(replication >= 1 && otherReplication >= 1, "Replication counts must be >= 1") | |
+ | |
+ val rand = r.nextInt(otherReplication) | |
+ (0 until replication).map { rep => (rand, rep) } | |
+ } | |
+ | |
private def assertValidJoinMode(joiner : Joiner, left : Int, right : Int) { | |
(joiner, left, right) match { | |
case (i : InnerJoin, _, _) => true | |
@@ -276,6 +333,108 @@ | |
) | |
} | |
} | |
+ | |
+ /** | |
+ * Performs a skewed join, which is useful when the data has extreme skew. | |
+ * | |
+ * For example, imagine joining a pipe of Twitter's follow graph against a pipe of user genders, | |
+ * in order to find the gender distribution of the accounts every Twitter user follows. Since celebrities | |
+ * (e.g., Justin Bieber and Lady Gaga) have a much larger follower base than other users, and (under | |
+ * a standard join algorithm) all their followers get sent to the same reducer, the job will likely be | |
+ * stuck on a few reducers for a long time. A skewed join attempts to alleviate this problem. | |
+ * | |
+ * This works as follows: | |
+ * | |
+ * 1. First, we sample from the left and right pipes with some small probability, in order to determine | |
+ * approximately how often each join key appears in each pipe. | |
+ * 2. We use these estimated counts to replicate the join keys, according to the given replication strategy. | |
+ * 3. Finally, we join the replicated pipes together. | |
+ * | |
+ * @param sampleRate This controls how often we sample from the left and right pipes when estimating key counts. | |
+ * @param replicator Algorithm for determining how much to replicate a join key in the left and right pipes. | |
+ * | |
+ * Note: since we do not set the replication counts, only inner joins are allowed. (Otherwise, replicated | |
+ * rows would stay replicated when there is no counterpart in the other pipe.) | |
+ */ | |
+ def skewJoinWithSmaller(fs : (Fields, Fields), otherPipe : Pipe, | |
+ sampleRate : Double = 0.001, reducers : Int = -1, | |
+ replicator : SkewReplication = SkewReplicationA()) : Pipe = { | |
+ | |
+ assert(sampleRate > 0 && sampleRate < 1, "Sampling rate for skew joins must lie strictly between 0 and 1") | |
+ // This assertion could be avoided, but since this function calls outer joins and left joins, | |
+ // we assume it to avoid renaming pain. | |
+ assert(fs._1.iterator.toList.intersect(fs._2.iterator.toList).isEmpty, "Join keys in a skew join must be disjoint") | |
+ | |
+ // 1. First, get an approximate count of the left join keys and the right join keys, so that we | |
+ // know how much to replicate. | |
+ // TODO: try replacing this with a Count-Min sketch. | |
+ val leftSampledCountField = "__LEFT_SAMPLED_COUNT__" | |
+ val rightSampledCountField = "__RIGHT_SAMPLED_COUNT__" | |
+ val sampledCountFields = new Fields(leftSampledCountField, rightSampledCountField) | |
+ | |
+ val sampledLeft = pipe.filter() { u : Unit => scala.math.random < sampleRate } | |
+ .groupBy(fs._1) { _.size(leftSampledCountField) } | |
+ val sampledRight = otherPipe.filter() { u : Unit => scala.math.random < sampleRate } | |
+ .groupBy(fs._2) { _.size(rightSampledCountField) } | |
+ | |
+ val sampledCounts = sampledLeft.joinWithSmaller(fs._1 -> fs._2, sampledRight, joiner = new OuterJoin) | |
+ .project(Fields.join(fs._1, fs._2, sampledCountFields)) | |
+ | |
+ // 2. Now replicate each group of join keys in the left and right pipes, according to the sampled counts | |
+ // from the previous step. | |
+ val leftReplicationFields = new Fields("__LEFT_RAND__", "__LEFT_REP__") | |
+ val rightReplicationFields = new Fields("__RIGHT_REP__", "__RIGHT_RAND__") | |
+ | |
+ val replicatedLeft = skewReplicate(pipe, sampledCounts, fs._1, sampledCountFields, leftReplicationFields, | |
+ replicator, reducers) | |
+ val replicatedRight = skewReplicate(otherPipe, sampledCounts, fs._2, sampledCountFields, rightReplicationFields, | |
+ replicator, reducers, true) | |
+ | |
+ // 3. Finally, join the replicated pipes together. | |
+ val leftJoinFields = Fields.join(fs._1, leftReplicationFields) | |
+ val rightJoinFields = Fields.join(fs._2, rightReplicationFields) | |
+ | |
+ replicatedLeft | |
+ .joinWithSmaller(leftJoinFields -> rightJoinFields, replicatedRight, joiner = new InnerJoin, reducers) | |
+ .discard(leftReplicationFields) | |
+ .discard(rightReplicationFields) | |
+ } | |
+ | |
+ /** | |
+ * Helper method for performing skewed joins. This replicates the rows in {pipe} according | |
+ * to the estimated counts in {sampledCounts}. | |
+ * | |
+ * @param pipe The pipe to be replicated. | |
+ * @param sampledCounts A pipe containing, for each key, the estimated counts of how often | |
+ * this key appeared in the samples of the original left and right pipes. | |
+ * @param replicator Strategy for how the pipe is replicated. | |
+ * @param isPipeOnRight Set to true when replicating the right pipe. | |
+ */ | |
+ private def skewReplicate(pipe : Pipe, sampledCounts : Pipe, joinFields : Fields, | |
+ countFields : Fields, replicationFields : Fields, | |
+ replicator : SkewReplication, | |
+ numReducers : Int = -1, isPipeOnRight : Boolean = false) = { | |
+ | |
+ // Rename the fields to prepare for the leftJoin below. | |
+ val renamedFields = joinFields.iterator.toList.map { field => "__RENAMED_" + field + "__" } | |
+ val renamedSampledCounts = sampledCounts.rename(joinFields -> renamedFields) | |
+ .project(Fields.join(renamedFields, countFields)) | |
+ | |
+ pipe | |
+ // Join the pipe against the sampled counts, so that we know approximately how often each | |
+ // join key appears. | |
+ .leftJoinWithTiny(joinFields -> renamedFields, renamedSampledCounts) | |
+ .using(new Random with Stateful) | |
+ .flatMap(countFields -> replicationFields) { (rand : Random, counts : (Int, Int)) => | |
+ val (leftRep, rightRep) = replicator.getReplications(counts._1, counts._2, numReducers) | |
+ | |
+ val (rep, otherRep) = if (isPipeOnRight) (rightRep, leftRep) else (leftRep, rightRep) | |
+ val rfs = getReplicationFields(rand, rep, otherRep) | |
+ if (isPipeOnRight) rfs.map { case (i, j) => (j, i) } else rfs | |
+ } | |
+ .discard(renamedFields) | |
+ .discard(countFields) | |
+ } | |
} | |
class InvalidJoinModeException(args : String) extends Exception(args) | |
Only in etsy-scalding/src/main/scala/com/twitter/scalding: Matrix.scala | |
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/Mode.scala etsy-scalding/src/main/scala/com/twitter/scalding/Mode.scala | |
--- twitter-scalding/src/main/scala/com/twitter/scalding/Mode.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/main/scala/com/twitter/scalding/Mode.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -43,7 +43,9 @@ | |
/** | |
* This mode is used by default by sources in read and write | |
*/ | |
- implicit var mode : Mode = Local(false) | |
+ def emptyConf = { val c = new Configuration; c.clear; c } | |
+ | |
+ implicit var mode : Mode = Local(false, emptyConf) | |
// This should be passed ALL the args supplied after the job name | |
def apply(args : Args, config : Configuration) : Mode = { | |
@@ -54,7 +56,7 @@ | |
} | |
if (args.boolean("local")) | |
- Local(strictSources) | |
+ Local(strictSources, config) | |
else if (args.boolean("hdfs")) | |
Hdfs(strictSources, config) | |
else | |
@@ -65,14 +67,20 @@ | |
* There are three ways to run jobs | |
* sourceStrictness is set to true | |
*/ | |
-abstract class Mode(val sourceStrictness : Boolean) { | |
+abstract class Mode(val sourceStrictness : Boolean, val jobConf : Configuration) { | |
+ | |
// We can't name two different pipes with the same name. | |
// NOTE: there is a subtle bug in scala regarding case classes | |
// with multiple sets of arguments, and their equality. | |
// For this reason, we use Source.toString as the key in this map | |
protected val sourceMap = MMap[String, (Source, Pipe)]() | |
- def config = Map[AnyRef,AnyRef]() | |
+ def config : Map[AnyRef,AnyRef] = { | |
+ jobConf.foldLeft(Map[AnyRef, AnyRef]()) { | |
+ (acc, kv) => acc + ((kv.getKey, kv.getValue)) | |
+ } | |
+ } | |
+ | |
def newFlowConnector(props : Map[AnyRef,AnyRef]) : FlowConnector | |
/* | |
@@ -107,14 +115,6 @@ | |
trait HadoopMode extends Mode { | |
- def jobConf : Configuration | |
- | |
- override def config = { | |
- jobConf.foldLeft(Map[AnyRef, AnyRef]()) { | |
- (acc, kv) => acc + ((kv.getKey, kv.getValue)) | |
- } | |
- } | |
- | |
override def newFlowConnector(props : Map[AnyRef,AnyRef]) = { | |
new HadoopFlowConnector(props) | |
} | |
@@ -144,21 +144,18 @@ | |
override def fileExists(filename : String) : Boolean = fileSet.contains(filename) | |
} | |
-case class Hdfs(strict : Boolean, conf : Configuration) extends Mode(strict) with HadoopMode { | |
- override def jobConf = conf | |
+case class Hdfs(strict : Boolean, conf : Configuration) extends Mode(strict, conf) with HadoopMode { | |
override def fileExists(filename : String) : Boolean = | |
FileSystem.get(jobConf).exists(new Path(filename)) | |
} | |
case class HadoopTest(conf : Configuration, buffers : Map[Source,Buffer[Tuple]]) | |
- extends Mode(false) with HadoopMode with TestMode { | |
+ extends Mode(false, conf) with HadoopMode with TestMode { | |
// This is a map from source.toString to disk path | |
private val writePaths = MMap[Source, String]() | |
private val allPaths = MSet[String]() | |
- override def jobConf = conf | |
- | |
@tailrec | |
private def allocateNewPath(prefix : String, idx : Int) : String = { | |
val candidate = prefix + idx.toString | |
@@ -196,12 +193,12 @@ | |
} | |
} | |
-case class Local(strict : Boolean) extends Mode(strict) with CascadingLocal { | |
+case class Local(strict : Boolean, conf : Configuration) extends Mode(strict, conf) with CascadingLocal { | |
override def fileExists(filename : String) : Boolean = new File(filename).exists | |
} | |
/** | |
* Memory only testing for unit tests | |
*/ | |
-case class Test(val buffers : Map[Source,Buffer[Tuple]]) extends Mode(false) | |
+case class Test(val buffers : Map[Source,Buffer[Tuple]]) extends Mode(false, new Configuration) | |
with TestMode with CascadingLocal | |
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/ReduceOperations.scala etsy-scalding/src/main/scala/com/twitter/scalding/ReduceOperations.scala | |
--- twitter-scalding/src/main/scala/com/twitter/scalding/ReduceOperations.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/main/scala/com/twitter/scalding/ReduceOperations.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -21,7 +21,7 @@ | |
import com.twitter.algebird.{ | |
Monoid, | |
Ring, | |
- AveragedValue, | |
+ Averager, | |
Moments, | |
SortedTakeListMonoid, | |
HyperLogLogMonoid, | |
@@ -73,7 +73,7 @@ | |
* == Similar To == | |
* <a href="http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm">http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm</a> | |
*/ | |
- def average(f : (Fields, Fields)) = mapPlusMap(f) { (x : Double) => AveragedValue(1L, x) } { _.value } | |
+ def average(f : (Fields, Fields)) = aggregate(f)(Averager) | |
def average(f : Symbol) : Self = average(f->f) | |
/** | |
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/RichPipe.scala etsy-scalding/src/main/scala/com/twitter/scalding/RichPipe.scala | |
--- twitter-scalding/src/main/scala/com/twitter/scalding/RichPipe.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/main/scala/com/twitter/scalding/RichPipe.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -27,6 +27,10 @@ | |
import cascading.tuple._ | |
import cascading.cascade._ | |
+import scala.util.Random | |
+ | |
+import java.security.MessageDigest | |
+ | |
object RichPipe extends java.io.Serializable { | |
private var nextPipe = -1 | |
@@ -55,19 +59,22 @@ | |
} | |
} | |
+object MD5Hash extends java.io.Serializable { | |
+ private val hash = MessageDigest.getInstance("MD5") | |
+ def apply(s : String) : Int = { | |
+ hash.reset | |
+ val h : Array[Byte] = hash.digest(s.getBytes) | |
+ (h(0).toInt << 24) + (h(1).toInt << 16) + (h(2).toInt << 8) + h(3).toInt | |
+ } | |
+} | |
+ | |
class RichPipe(val pipe : Pipe) extends java.io.Serializable with JoinAlgorithms { | |
+ | |
// We need this for the implicits | |
import Dsl._ | |
import RichPipe.assignName | |
/** | |
- * A simple trait for releasable resource. Provides noop implementation. | |
- */ | |
- trait Stateful { | |
- def release() {} | |
- } | |
- | |
- /** | |
* Rename the current pipe | |
*/ | |
def name(s : String) = new Pipe(s, pipe) | |
@@ -89,7 +96,7 @@ | |
def apply(c: C) { c.release() } | |
}, | |
Fields.NONE, conv, set)) | |
- NullSource.write(newPipe)(flowDef, mode) | |
+ NullSource.writeFrom(newPipe)(flowDef, mode) | |
newPipe | |
} | |
@@ -129,8 +136,16 @@ | |
* takes any number of parameters as long as we can convert | |
* them to a fields object | |
*/ | |
- def project(fields : Fields) = { | |
- new Each(pipe, fields, new Identity(fields)) | |
+ def project(fields : Fields)(implicit tracing : Tracing) = { | |
+ tracing.tracingFields match { | |
+ case Some(tracingFields) => { | |
+ val f = if(tracing.isTraced(pipe) && !fields.contains(tracingFields)) fields.append(tracingFields) else fields | |
+ new Each(pipe, f, new Identity(f)) | |
+ } | |
+ case None => { | |
+ new Each(pipe, fields, new Identity(fields)) | |
+ } | |
+ } | |
} | |
/** | |
@@ -156,14 +171,18 @@ | |
* _.size.max('f1) etc... | |
* }}} | |
*/ | |
- def groupBy(f : Fields)(builder : GroupBuilder => GroupBuilder) : Pipe = { | |
- builder(new GroupBuilder(f)).schedule(pipe.getName, pipe) | |
+ def groupBy(f : Fields)(builder : GroupBuilder => GroupBuilder)(implicit tracing : Tracing) : Pipe = { | |
+ tracing.onGroupBy(builder(new GroupBuilder(f)), pipe).schedule(pipe.getName, pipe) | |
+ } | |
+ | |
+ def groupByNoMerge(f : Fields)(builder : GroupBuilder => GroupBuilder)(implicit tracing : Tracing) : Pipe = { | |
+ tracing.onGroupByNoMerge(builder(new GroupBuilder(f)), pipe).schedule(pipe.getName, pipe) | |
} | |
/** | |
* Returns the set of unique tuples containing the specified fields | |
*/ | |
- def unique(f : Fields) : Pipe = groupBy(f) { _.size('__uniquecount__) }.project(f) | |
+ def unique(f : Fields) : Pipe = groupByNoMerge(f) { _.size('__uniquecount__) }.project(f) | |
/** | |
* Merge or Concatenate several pipes together with this one: | |
@@ -176,9 +195,7 @@ | |
* This is probably only useful just before setting a tail such as Database | |
* tail, so that only one reducer talks to the DB. Kind of a hack. | |
*/ | |
- def groupAll : Pipe = groupAll { g => | |
- g.takeWhile(0)((t : TupleEntry) => true) | |
- } | |
+ def groupAll : Pipe = groupAll { _.pass } | |
/** | |
* == Warning == | |
@@ -196,6 +213,29 @@ | |
.discard('__groupAll__) | |
} | |
+ def shard(n : Int) : Pipe = groupRandomly(n) { _.pass } | |
+ | |
+ /** | |
+ * Like groupAll, but randomly groups data into n reducers. | |
+ */ | |
+ def groupRandomly(n : Int)(gs: GroupBuilder => GroupBuilder)(implicit tracing : Tracing) : Pipe = { | |
+ using(new Random with Stateful) | |
+ .map(()->'__shard__) { (r:Random, _:Unit) => r.nextInt(n) } | |
+ .groupBy('__shard__) { g : GroupBuilder => gs(g).reducers(n) } (tracing) | |
+ .discard('__shard__) | |
+ } | |
+ | |
+ /** | |
+ * Adds a field with a constant value. | |
+ * | |
+ * == Usage == | |
+ * {{{ | |
+ * insert('a, 1) | |
+ * }}} | |
+ */ | |
+ def insert[A](fs : Fields, value : A)(implicit conv : TupleSetter[A]) : Pipe = | |
+ map(() -> fs) { _:Unit => value } | |
+ | |
/** | |
* Rename some set of N fields as another set of N fields | |
* | |
@@ -226,6 +266,26 @@ | |
} | |
/** | |
+ * Subsample the entries in the pipe. | |
+ * Hash some fields rather than use random noise, for the sake of reproduceability. | |
+ */ | |
+ def subsample(p : Double)(implicit tracing : Tracing) : Pipe = { | |
+ subsample(Fields.ALL, p) | |
+ } | |
+ | |
+ def subsample(fields : Fields, p : Double)(implicit tracing : Tracing) : Pipe = { | |
+ val q : Long = math.round(1.0/p) | |
+ if(tracing.isTraced(pipe)) { | |
+ tracing.tracingFields match { | |
+ case Some(tf) => filter[TupleEntry](fields){ t : TupleEntry => val s = new Tuple(t.getTuple); if(t.getFields.contains(tf)) s.remove(t.getFields, tf); MD5Hash(s.toString) % q == 0 } | |
+ case _ => filter[TupleEntry](fields, p){ t : TupleEntry => MD5Hash(t.getTuple.toString) %q == 0} | |
+ } | |
+ } else { | |
+ filter[TupleEntry](fields, p){ t : TupleEntry => MD5Hash(t.getTuple.toString) % q == 0} | |
+ } | |
+ } | |
+ | |
+ /** | |
* If you use a map function that does not accept TupleEntry args, | |
* which is the common case, an implicit conversion in GeneratedConversions | |
* will convert your function into a `(TupleEntry => T)`. The result type | |
@@ -262,10 +322,15 @@ | |
each(fs)(new MapFunction[A,T](fn, _, conv, setter)) | |
} | |
def mapTo[A,T](fs : (Fields,Fields))(fn : A => T) | |
- (implicit conv : TupleConverter[A], setter : TupleSetter[T]) : Pipe = { | |
+ (implicit conv : TupleConverter[A], setter : TupleSetter[T], tracing : Tracing) : Pipe = { | |
conv.assertArityMatches(fs._1) | |
setter.assertArityMatches(fs._2) | |
- eachTo(fs)(new MapFunction[A,T](fn, _, conv, setter)) | |
+ tracing.tracingFields match { | |
+ case Some(fields) => | |
+ each(fs)(new MapFunction[A,T](fn, _, conv, setter)).project(fs._2) | |
+ case None => | |
+ eachTo(fs)(new MapFunction[A,T](fn, _, conv, setter)) | |
+ } | |
} | |
def flatMap[A,T](fs : (Fields,Fields))(fn : A => Iterable[T]) | |
(implicit conv : TupleConverter[A], setter : TupleSetter[T]) : Pipe = { | |
@@ -274,10 +339,15 @@ | |
each(fs)(new FlatMapFunction[A,T](fn, _, conv, setter)) | |
} | |
def flatMapTo[A,T](fs : (Fields,Fields))(fn : A => Iterable[T]) | |
- (implicit conv : TupleConverter[A], setter : TupleSetter[T]) : Pipe = { | |
+ (implicit conv : TupleConverter[A], setter : TupleSetter[T], tracing : Tracing) : Pipe = { | |
conv.assertArityMatches(fs._1) | |
setter.assertArityMatches(fs._2) | |
- eachTo(fs)(new FlatMapFunction[A,T](fn, _, conv, setter)) | |
+ tracing.tracingFields match { | |
+ case Some(fields) => | |
+ each(fs)(new FlatMapFunction[A,T](fn, _, conv, setter)).project(fs._2) | |
+ case None => | |
+ eachTo(fs)(new FlatMapFunction[A,T](fn, _, conv, setter)) | |
+ } | |
} | |
/** | |
@@ -354,8 +424,33 @@ | |
def debug = new Each(pipe, new Debug()) | |
- def write(outsource : Source)(implicit flowDef : FlowDef, mode : Mode) = { | |
- outsource.write(pipe)(flowDef, mode) | |
+ def write(outsource : Source)(implicit flowDef : FlowDef, mode : Mode, tracing : Tracing) = { | |
+ Tracing.tracing = new NullTracing() | |
+ tracing.tracingFields match { | |
+ case Some(fields) => { | |
+ if(tracing.isTraced(pipe)) | |
+ outsource.writeFrom(discard(fields))(flowDef, mode) | |
+ else | |
+ outsource.writeFrom(pipe)(flowDef, mode) | |
+ } | |
+ case None => outsource.writeFrom(pipe)(flowDef, mode) | |
+ } | |
+ Tracing.tracing = tracing | |
+ tracing.onWrite(pipe) | |
+ } | |
+ /** | |
+ * Adds a trap to the current pipe, | |
+ * which will capture all exceptions that occur in this pipe | |
+ * and save them to the trapsource given | |
+ * | |
+ * Traps do not include the original fields in a tuple, | |
+ * only the fields seen in an operation. | |
+ * Traps also do not include any exception information. | |
+ * | |
+ * There can only be at most one trap for each pipe. | |
+ **/ | |
+ def addTrap(trapsource : Source)(implicit flowDef : FlowDef, mode : Mode) = { | |
+ flowDef.addTrap(pipe, trapsource.createTap(Write)(mode)) | |
pipe | |
} | |
@@ -394,11 +489,11 @@ | |
/** | |
* Same as pack but only the to fields are preserved. | |
*/ | |
- def packTo[T](fs : (Fields, Fields))(implicit packer : TuplePacker[T], setter : TupleSetter[T]) : Pipe = { | |
+ def packTo[T](fs : (Fields, Fields))(implicit packer : TuplePacker[T], setter : TupleSetter[T], tracing : Tracing) : Pipe = { | |
val (fromFields, toFields) = fs | |
assert(toFields.size == 1, "Can only output 1 field in pack") | |
val conv = packer.newConverter(fromFields) | |
- pipe.mapTo(fs) { input : T => input } (conv, setter) | |
+ pipe.mapTo(fs) { input : T => input } (conv, setter, tracing) | |
} | |
/** | |
@@ -421,10 +516,17 @@ | |
/** | |
* Same as unpack but only the to fields are preserved. | |
*/ | |
- def unpackTo[T](fs : (Fields, Fields))(implicit unpacker : TupleUnpacker[T], conv : TupleConverter[T]) : Pipe = { | |
+ def unpackTo[T](fs : (Fields, Fields))(implicit unpacker : TupleUnpacker[T], conv : TupleConverter[T], tracing : Tracing) : Pipe = { | |
val (fromFields, toFields) = fs | |
assert(fromFields.size == 1, "Can only take 1 input field in unpack") | |
val setter = unpacker.newSetter(toFields) | |
- pipe.mapTo(fs) { input : T => input } (conv, setter) | |
+ pipe.mapTo(fs) { input : T => input } (conv, setter, tracing) | |
} | |
} | |
+ | |
+/** | |
+ * A simple trait for releasable resource. Provides noop implementation. | |
+ */ | |
+trait Stateful { | |
+ def release() {} | |
+} | |
Only in etsy-scalding/src/main/scala/com/twitter/scalding: SkewReplication.scala | |
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/Source.scala etsy-scalding/src/main/scala/com/twitter/scalding/Source.scala | |
--- twitter-scalding/src/main/scala/com/twitter/scalding/Source.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/main/scala/com/twitter/scalding/Source.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -88,7 +88,7 @@ | |
sys.error("Cascading Hadoop mode not supported for: " + toString) | |
} | |
- def read(implicit flowDef : FlowDef, mode : Mode) = { | |
+ def read(implicit flowDef : FlowDef, mode : Mode, tracing : Tracing) = { | |
//insane workaround for scala compiler bug | |
val sources = flowDef.getSources().asInstanceOf[JMap[String,Any]] | |
val srcName = this.toString | |
@@ -102,7 +102,7 @@ | |
* write the pipe and return the input so it can be chained into | |
* the next operation | |
*/ | |
- def write(pipe : Pipe)(implicit flowDef : FlowDef, mode : Mode) = { | |
+ def writeFrom(pipe : Pipe)(implicit flowDef : FlowDef, mode : Mode) = { | |
//insane workaround for scala compiler bug | |
val sinks = flowDef.getSinks().asInstanceOf[JMap[String,Any]] | |
val sinkName = this.toString | |
@@ -186,16 +186,16 @@ | |
// Due to type erasure, your subclass must supply this | |
val converter : TupleConverter[T] | |
def mapTo[U](out : Fields)(mf : (T) => U) | |
- (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U]) = { | |
- RichPipe(read(flowDef, mode)).mapTo[T,U](sourceFields -> out)(mf)(converter, setter) | |
+ (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U], tracing : Tracing) = { | |
+ RichPipe(read(flowDef, mode, tracing)).mapTo[T,U](sourceFields -> out)(mf)(converter, setter, tracing) | |
} | |
/** | |
* If you want to filter, you should use this and output a 0 or 1 length Iterable. | |
* Filter does not change column names, and we generally expect to change columns here | |
*/ | |
def flatMapTo[U](out : Fields)(mf : (T) => Iterable[U]) | |
- (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U]) = { | |
- RichPipe(read(flowDef, mode)).flatMapTo[T,U](sourceFields -> out)(mf)(converter, setter) | |
+ (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U], tracing : Tracing) = { | |
+ RichPipe(read(flowDef, mode, tracing)).flatMapTo[T,U](sourceFields -> out)(mf)(converter, setter, tracing) | |
} | |
} | |
@@ -239,7 +239,7 @@ | |
case Read => throw new Exception("not supported, reading from null") | |
case Write => mode match { | |
case Hdfs(_, _) => new NullTap[JobConf, RecordReader[_,_], OutputCollector[_,_], Any, Any] | |
- case Local(_) => new NullTap[Properties, InputStream, OutputStream, Any, Any] | |
+ case Local(_, _) => new NullTap[Properties, InputStream, OutputStream, Any, Any] | |
case Test(_) => new NullTap[Properties, InputStream, OutputStream, Any, Any] | |
} | |
} | |
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/StreamOperations.scala etsy-scalding/src/main/scala/com/twitter/scalding/StreamOperations.scala | |
--- twitter-scalding/src/main/scala/com/twitter/scalding/StreamOperations.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/main/scala/com/twitter/scalding/StreamOperations.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -73,7 +73,7 @@ | |
new ScanLeftIterator(s, init, fn) | |
}(conv,setter) | |
} | |
- | |
+ | |
/** | |
* Only keep the first cnt elements | |
*/ | |
@@ -82,10 +82,10 @@ | |
s.take(cnt) | |
}(CTupleConverter, CascadingTupleSetter) | |
} | |
- | |
+ | |
/** | |
- * Take while the predicate is true, starting at the | |
- * first false, output all | |
+ * Take while the predicate is true, stopping at the | |
+ * first false. Output all taken elements. | |
*/ | |
def takeWhile[T](f : Fields)(fn : (T) => Boolean)(implicit conv : TupleConverter[T]) : Self = { | |
mapStream[TupleEntry,CTuple](f -> Fields.ARGS){ s => | |
Only in etsy-scalding/src/main/scala/com/twitter/scalding: Tracing.scala | |
Only in etsy-scalding/src/main/scala/com/twitter/scalding: TracingFileSource.scala | |
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/TypedPipe.scala etsy-scalding/src/main/scala/com/twitter/scalding/TypedPipe.scala | |
--- twitter-scalding/src/main/scala/com/twitter/scalding/TypedPipe.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/main/scala/com/twitter/scalding/TypedPipe.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -8,7 +8,7 @@ | |
import java.io.Serializable | |
-import com.twitter.algebird.{Monoid, Ring} | |
+import com.twitter.algebird.{Monoid, Ring, Aggregator} | |
import com.twitter.scalding.typed.{Joiner, CoGrouped2, HashCoGrouped2} | |
/*************** | |
@@ -23,11 +23,6 @@ | |
* to get automatic conversion of Mappable[T] to TypedPipe[T] | |
*/ | |
object TDsl extends Serializable { | |
- //This can be used to avoid using groupBy: | |
- implicit def pipeToGrouped[K,V](tpipe : TypedPipe[(K,V)])(implicit ord : Ordering[K]) : Grouped[K,V] = { | |
- tpipe.group[K,V] | |
- } | |
- implicit def keyedToPipe[K,V](keyed : KeyedList[K,V]) : TypedPipe[(K,V)] = keyed.toTypedPipe | |
implicit def pipeTExtensions(pipe : Pipe) : PipeTExtensions = new PipeTExtensions(pipe) | |
implicit def mappableToTypedPipe[T](mappable : Mappable[T]) | |
(implicit flowDef : FlowDef, mode : Mode, conv : TupleConverter[T]) : TypedPipe[T] = { | |
@@ -50,8 +45,8 @@ | |
* The above sums all the tuples and returns a TypedPipe[Int] which has the total sum. | |
*/ | |
def typed[T,U](fielddef : (Fields, Fields))(fn : TypedPipe[T] => TypedPipe[U]) | |
- (implicit conv : TupleConverter[T], setter : TupleSetter[U]) : Pipe = { | |
- fn(TypedPipe.from(pipe, fielddef._1)(conv)).toPipe(fielddef._2)(setter) | |
+ (implicit conv : TupleConverter[T], setter : TupleSetter[U], tracing : Tracing) : Pipe = { | |
+ fn(TypedPipe.from(pipe, fielddef._1)(conv)).toPipe(fielddef._2)(setter,tracing) | |
} | |
def toTypedPipe[T](fields : Fields)(implicit conv : TupleConverter[T]) : TypedPipe[T] = { | |
TypedPipe.from[T](pipe, fields)(conv) | |
@@ -87,9 +82,13 @@ | |
* The output pipe has a single item CTuple with an object of type T in position 0 | |
*/ | |
protected lazy val pipe : Pipe = { | |
- inpipe.flatMapTo(fields -> 0)(flatMapFn)(implicitly[TupleConverter[TupleEntry]], SingleSetter) | |
+ inpipe.flatMapTo(fields -> 0)(flatMapFn)(implicitly[TupleConverter[TupleEntry]], SingleSetter, Tracing.tracing) | |
} | |
+ /** Same as groupAll.aggregate.values | |
+ */ | |
+ def aggregate[B,C](agg: Aggregator[T,B,C]): TypedPipe[C] = groupAll.aggregate(agg).values | |
+ | |
// Implements a cross project. The right side should be tiny | |
def cross[U](tiny : TypedPipe[U]) : TypedPipe[(T,U)] = { | |
val crossedPipe = pipe.rename(0 -> 't) | |
@@ -136,29 +135,34 @@ | |
TypedPipe.from(pipe ++ other.pipe, 0)(singleConverter[U]) | |
} | |
- def toPipe(fieldNames : Fields)(implicit setter : TupleSetter[T]) : Pipe = { | |
+ /** Reasonably common shortcut for cases of associative/commutative reduction | |
+ * returns a typed pipe with only one element. | |
+ */ | |
+ def sum(implicit plus: Monoid[T]): TypedPipe[T] = groupAll.sum.values | |
+ | |
+ def toPipe(fieldNames : Fields)(implicit setter : TupleSetter[T], tracing : Tracing) : Pipe = { | |
val conv = implicitly[TupleConverter[TupleEntry]] | |
- inpipe.flatMapTo(fields -> fieldNames)(flatMapFn)(conv, setter) | |
+ inpipe.flatMapTo(fields -> fieldNames)(flatMapFn)(conv, setter, tracing) | |
} | |
- def unpackToPipe(fieldNames : Fields)(implicit up : TupleUnpacker[T]) : Pipe = { | |
+ def unpackToPipe(fieldNames : Fields)(implicit up : TupleUnpacker[T], tracing : Tracing) : Pipe = { | |
val setter = up.newSetter(fieldNames) | |
- toPipe(fieldNames)(setter) | |
+ toPipe(fieldNames)(setter, tracing) | |
} | |
/** A convenience method equivalent to toPipe(fieldNames).write(dest) | |
* @return a pipe equivalent to the current pipe. | |
*/ | |
def write(fieldNames : Fields, dest : Source) | |
- (implicit conv : TupleConverter[T], setter : TupleSetter[T], flowDef : FlowDef, mode : Mode) : TypedPipe[T] = { | |
- val pipe = toPipe(fieldNames)(setter) | |
+ (implicit conv : TupleConverter[T], setter : TupleSetter[T], flowDef : FlowDef, mode : Mode, tracing : Tracing) : TypedPipe[T] = { | |
+ val pipe = toPipe(fieldNames)(setter, tracing) | |
pipe.write(dest) | |
// Now, we have written out, so let's start from here with the new pipe: | |
// If we don't do this, Cascading's flow planner can't see what's happening | |
TypedPipe.from(pipe, fieldNames)(conv) | |
} | |
def write(dest: Source) | |
- (implicit conv : TupleConverter[T], setter : TupleSetter[T], flowDef : FlowDef, mode : Mode) : TypedPipe[T] = { | |
- write(Dsl.intFields(0 until setter.arity), dest)(conv,setter,flowDef,mode) | |
+ (implicit conv : TupleConverter[T], setter : TupleSetter[T], flowDef : FlowDef, mode : Mode, tracing : Tracing) : TypedPipe[T] = { | |
+ write(Dsl.intFields(0 until setter.arity), dest)(conv,setter,flowDef,mode,tracing) | |
} | |
def keys[K](implicit ev : <:<[T,(K,_)]) : TypedPipe[K] = map { _._1 } | |
@@ -194,6 +198,18 @@ | |
* Avoid accumulating the whole list in memory if you can. Prefer reduce. | |
*/ | |
def mapValueStream[V](smfn : Iterator[T] => Iterator[V]) : KeyedList[K,V] | |
+ | |
+ /////////// | |
+ /// The below are all implemented in terms of the above: | |
+ /////////// | |
+ | |
+ /** Use Algebird Aggregator to do the reduction | |
+ */ | |
+ def aggregate[B,C](agg: Aggregator[T,B,C]): TypedPipe[(K,C)] = | |
+ mapValues(agg.prepare _) | |
+ .reduce(agg.reduce _) | |
+ .map { kv => (kv._1, agg.present(kv._2)) } | |
+ | |
/** This is a special case of mapValueStream, but can be optimized because it doesn't need | |
* all the values for a given key at once. An unoptimized implementation is: | |
* mapValueStream { _.map { fn } } | |
@@ -262,7 +278,7 @@ | |
// Make a new Grouped from a pipe with two fields: 'key, 'value | |
def fromKVPipe[K,V](pipe : Pipe, ordering : Ordering[K]) | |
(implicit conv : TupleConverter[V]) : Grouped[K,V] = { | |
- new Grouped[K,V](pipe, ordering, None, None, -1) | |
+ new Grouped[K,V](pipe, ordering, None, None, -1, false) | |
} | |
def valueSorting[T](implicit ord : Ordering[T]) : Fields = sorting("value", ord) | |
@@ -279,7 +295,8 @@ | |
val ordering : Ordering[K], | |
streamMapFn : Option[(Iterator[Tuple]) => Iterator[T]], | |
private[scalding] val valueSort : Option[(Fields,Boolean)], | |
- val reducers : Int = -1) | |
+ val reducers : Int = -1, | |
+ val toReducers: Boolean = false) | |
extends KeyedList[K,T] with Serializable { | |
import Dsl._ | |
@@ -291,15 +308,17 @@ | |
if (fb._2) gbSorted.reverse else gbSorted | |
}.getOrElse(gb) | |
} | |
+ def forceToReducers: Grouped[K,T] = | |
+ new Grouped(pipe, ordering, streamMapFn, valueSort, reducers, true) | |
def withSortOrdering(so : Ordering[T]) : Grouped[K,T] = { | |
// Set the sorting with unreversed | |
assert(valueSort.isEmpty, "Can only call withSortOrdering once") | |
assert(streamMapFn.isEmpty, "Cannot sort after a mapValueStream") | |
val newValueSort = Some(Grouped.valueSorting(so)).map { f => (f,false) } | |
- new Grouped(pipe, ordering, None, newValueSort, reducers) | |
+ new Grouped(pipe, ordering, None, newValueSort, reducers, toReducers) | |
} | |
def withReducers(red : Int) : Grouped[K,T] = { | |
- new Grouped(pipe, ordering, streamMapFn, valueSort, red) | |
+ new Grouped(pipe, ordering, streamMapFn, valueSort, red, toReducers) | |
} | |
def sortBy[B](fn : (T) => B)(implicit ord : Ordering[B]) : Grouped[K,T] = { | |
withSortOrdering(new MappedOrdering(fn, ord)) | |
@@ -318,12 +337,13 @@ | |
def reverse : Grouped[K,T] = { | |
assert(streamMapFn.isEmpty, "Cannot reverse after mapValueStream") | |
val newValueSort = valueSort.map { f => (f._1, !(f._2)) } | |
- new Grouped(pipe, ordering, None, newValueSort, reducers) | |
+ new Grouped(pipe, ordering, None, newValueSort, reducers, toReducers) | |
} | |
protected def operate[T1](fn : GroupBuilder => GroupBuilder) : TypedPipe[(K,T1)] = { | |
val reducedPipe = pipe.groupBy(groupKey) { gb => | |
- fn(sortIfNeeded(gb)).reducers(reducers) | |
+ val out = fn(sortIfNeeded(gb)).reducers(reducers) | |
+ if(toReducers) out.forceToReducers else out | |
} | |
TypedPipe.from(reducedPipe, ('key, 'value))(implicitly[TupleConverter[(K,T1)]]) | |
} | |
@@ -351,7 +371,7 @@ | |
// We have no sort defined yet, so we should operate on the pipe so we can sort by V after | |
// if we need to: | |
new Grouped(pipe.map('value -> 'value)(fn)(singleConverter[T], SingleSetter), | |
- ordering, None, None, reducers) | |
+ ordering, None, None, reducers, toReducers) | |
} | |
else { | |
// There is a sorting, which invalidates map-side optimizations, | |
@@ -378,7 +398,7 @@ | |
} | |
override def mapValueStream[V](nmf : Iterator[T] => Iterator[V]) : Grouped[K,V] = { | |
val newStreamMapFn = Some(streamMapping.andThen(nmf)) | |
- new Grouped[K,V](pipe, ordering, newStreamMapFn, valueSort, reducers) | |
+ new Grouped[K,V](pipe, ordering, newStreamMapFn, valueSort, reducers, toReducers) | |
} | |
// SMALLER PIPE ALWAYS ON THE RIGHT!!!!!!! | |
def cogroup[W,R](smaller: Grouped[K,W])(joiner: (K, Iterator[T], Iterable[W]) => Iterator[R]) | |
Only in twitter-scalding/src/main/scala/com/twitter/scalding/mathematics: Matrix.scala | |
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/mathematics/MatrixProduct.scala etsy-scalding/src/main/scala/com/twitter/scalding/mathematics/MatrixProduct.scala | |
--- twitter-scalding/src/main/scala/com/twitter/scalding/mathematics/MatrixProduct.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/main/scala/com/twitter/scalding/mathematics/MatrixProduct.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -143,6 +143,162 @@ | |
} | |
} | |
+ implicit def scalarRowRightProduct[Col,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[RowVector[Col,ValT],Scalar[ValT],RowVector[Col,ValT]] = | |
+ new MatrixProduct[RowVector[Col,ValT],Scalar[ValT],RowVector[Col,ValT]] { | |
+ def apply(left : RowVector[Col,ValT], right : Scalar[ValT]) : RowVector[Col,ValT]= { | |
+ val prod = left.toMatrix(0).nonZerosWith(right).mapValues({leftRight => | |
+ val (left, right) = leftRight | |
+ ring.times(left, right) | |
+ })(ring) | |
+ | |
+ new RowVector[Col,ValT](prod.colSym, prod.valSym, prod.pipe.project(prod.colSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
+ implicit def scalarRowLeftProduct[Col,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[Scalar[ValT],RowVector[Col,ValT],RowVector[Col,ValT]] = | |
+ new MatrixProduct[Scalar[ValT],RowVector[Col,ValT],RowVector[Col,ValT]] { | |
+ def apply(left : Scalar[ValT], right : RowVector[Col,ValT]) : RowVector[Col,ValT]= { | |
+ val prod = right.toMatrix(0).nonZerosWith(left).mapValues({matScal => | |
+ val (matVal, scalarVal) = matScal | |
+ ring.times(scalarVal, matVal) | |
+ })(ring) | |
+ | |
+ new RowVector[Col,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
+ implicit def scalarColRightProduct[Row,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[ColVector[Row,ValT],Scalar[ValT],ColVector[Row,ValT]] = | |
+ new MatrixProduct[ColVector[Row,ValT],Scalar[ValT],ColVector[Row,ValT]] { | |
+ def apply(left : ColVector[Row,ValT], right : Scalar[ValT]) : ColVector[Row,ValT]= { | |
+ val prod = left.toMatrix(0).nonZerosWith(right).mapValues({leftRight => | |
+ val (left, right) = leftRight | |
+ ring.times(left, right) | |
+ })(ring) | |
+ | |
+ new ColVector[Row,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
+ implicit def scalarColLeftProduct[Row,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[Scalar[ValT],ColVector[Row,ValT],ColVector[Row,ValT]] = | |
+ new MatrixProduct[Scalar[ValT],ColVector[Row,ValT],ColVector[Row,ValT]] { | |
+ def apply(left : Scalar[ValT], right : ColVector[Row,ValT]) : ColVector[Row,ValT]= { | |
+ val prod = right.toMatrix(0).nonZerosWith(left).mapValues({matScal => | |
+ val (matVal, scalarVal) = matScal | |
+ ring.times(scalarVal, matVal) | |
+ })(ring) | |
+ | |
+ new ColVector[Row,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
+ implicit def litScalarRowRightProduct[Col,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[RowVector[Col,ValT],LiteralScalar[ValT],RowVector[Col,ValT]] = | |
+ new MatrixProduct[RowVector[Col,ValT],LiteralScalar[ValT],RowVector[Col,ValT]] { | |
+ def apply(left : RowVector[Col,ValT], right : LiteralScalar[ValT]) : RowVector[Col,ValT]= { | |
+ val prod = left.toMatrix(0).nonZerosWith(right).mapValues({leftRight => | |
+ val (left, right) = leftRight | |
+ ring.times(left, right) | |
+ })(ring) | |
+ | |
+ new RowVector[Col,ValT](prod.colSym, prod.valSym, prod.pipe.project(prod.colSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
+ implicit def litScalarRowLeftProduct[Col,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[LiteralScalar[ValT],RowVector[Col,ValT],RowVector[Col,ValT]] = | |
+ new MatrixProduct[LiteralScalar[ValT],RowVector[Col,ValT],RowVector[Col,ValT]] { | |
+ def apply(left : LiteralScalar[ValT], right : RowVector[Col,ValT]) : RowVector[Col,ValT]= { | |
+ val prod = right.toMatrix(0).nonZerosWith(left).mapValues({matScal => | |
+ val (matVal, scalarVal) = matScal | |
+ ring.times(scalarVal, matVal) | |
+ })(ring) | |
+ | |
+ new RowVector[Col,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
+ implicit def litScalarColRightProduct[Row,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[ColVector[Row,ValT],LiteralScalar[ValT],ColVector[Row,ValT]] = | |
+ new MatrixProduct[ColVector[Row,ValT],LiteralScalar[ValT],ColVector[Row,ValT]] { | |
+ def apply(left : ColVector[Row,ValT], right : LiteralScalar[ValT]) : ColVector[Row,ValT]= { | |
+ val prod = left.toMatrix(0).nonZerosWith(right).mapValues({leftRight => | |
+ val (left, right) = leftRight | |
+ ring.times(left, right) | |
+ })(ring) | |
+ | |
+ new ColVector[Row,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
+ implicit def litScalarColLeftProduct[Row,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[LiteralScalar[ValT],ColVector[Row,ValT],ColVector[Row,ValT]] = | |
+ new MatrixProduct[LiteralScalar[ValT],ColVector[Row,ValT],ColVector[Row,ValT]] { | |
+ def apply(left : LiteralScalar[ValT], right : ColVector[Row,ValT]) : ColVector[Row,ValT]= { | |
+ val prod = right.toMatrix(0).nonZerosWith(left).mapValues({matScal => | |
+ val (matVal, scalarVal) = matScal | |
+ ring.times(scalarVal, matVal) | |
+ })(ring) | |
+ | |
+ new ColVector[Row,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
+ implicit def scalarDiagRightProduct[Row,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[DiagonalMatrix[Row,ValT],Scalar[ValT], DiagonalMatrix[Row,ValT]] = | |
+ new MatrixProduct[DiagonalMatrix[Row,ValT],Scalar[ValT],DiagonalMatrix[Row,ValT]] { | |
+ def apply(left : DiagonalMatrix[Row,ValT], right : Scalar[ValT]) : DiagonalMatrix[Row,ValT]= { | |
+ val prod = left.toRow.toMatrix(0).nonZerosWith(right).mapValues({leftRight => | |
+ val (left, right) = leftRight | |
+ ring.times(left, right) | |
+ })(ring) | |
+ | |
+ new DiagonalMatrix[Row,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
+ implicit def scalarDiagLeftProduct[Row,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[Scalar[ValT],DiagonalMatrix[Row,ValT],DiagonalMatrix[Row,ValT]] = | |
+ new MatrixProduct[Scalar[ValT],DiagonalMatrix[Row,ValT],DiagonalMatrix[Row,ValT]] { | |
+ def apply(left : Scalar[ValT], right : DiagonalMatrix[Row,ValT]) : DiagonalMatrix[Row,ValT]= { | |
+ val prod = right.toRow.toMatrix(0).nonZerosWith(left).mapValues({matScal => | |
+ val (matVal, scalarVal) = matScal | |
+ ring.times(scalarVal, matVal) | |
+ })(ring) | |
+ | |
+ new DiagonalMatrix[Row,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
+ implicit def litScalarDiagRightProduct[Col,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[DiagonalMatrix[Col,ValT],LiteralScalar[ValT],DiagonalMatrix[Col,ValT]] = | |
+ new MatrixProduct[DiagonalMatrix[Col,ValT],LiteralScalar[ValT],DiagonalMatrix[Col,ValT]] { | |
+ def apply(left : DiagonalMatrix[Col,ValT], right : LiteralScalar[ValT]) : DiagonalMatrix[Col,ValT]= { | |
+ val prod = left.toRow.toMatrix(0).nonZerosWith(right).mapValues({leftRight => | |
+ val (left, right) = leftRight | |
+ ring.times(left, right) | |
+ })(ring) | |
+ | |
+ new DiagonalMatrix[Col,ValT](prod.colSym, prod.valSym, prod.pipe.project(prod.colSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
+ implicit def litScalarDiagLeftProduct[Col,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[LiteralScalar[ValT],DiagonalMatrix[Col,ValT],DiagonalMatrix[Col,ValT]] = | |
+ new MatrixProduct[LiteralScalar[ValT],DiagonalMatrix[Col,ValT],DiagonalMatrix[Col,ValT]] { | |
+ def apply(left : LiteralScalar[ValT], right : DiagonalMatrix[Col,ValT]) : DiagonalMatrix[Col,ValT]= { | |
+ val prod = right.toRow.toMatrix(0).nonZerosWith(left).mapValues({matScal => | |
+ val (matVal, scalarVal) = matScal | |
+ ring.times(scalarVal, matVal) | |
+ })(ring) | |
+ | |
+ new DiagonalMatrix[Col,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
implicit def rowColProduct[IdxT,ValT](implicit ring : Ring[ValT]) : | |
MatrixProduct[RowVector[IdxT,ValT],ColVector[IdxT,ValT],Scalar[ValT]] = | |
new MatrixProduct[RowVector[IdxT,ValT],ColVector[IdxT,ValT],Scalar[ValT]] { | |
@@ -153,6 +309,36 @@ | |
} | |
} | |
+ implicit def matrixColVecProduct[RowT,CommonT,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[Matrix[RowT,CommonT,ValT],ColVector[CommonT,ValT],ColVector[RowT,ValT]] = | |
+ new MatrixProduct[Matrix[RowT,CommonT,ValT],ColVector[CommonT,ValT],ColVector[RowT,ValT]] { | |
+ def apply(left : Matrix[RowT,CommonT,ValT], right : ColVector[CommonT,ValT]) : ColVector[RowT,ValT] = { | |
+ val prod = (left * right.toMatrix(0)) : Matrix[RowT,Int,ValT] | |
+ new ColVector[RowT,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
+ implicit def rowVecMatrixProduct[CommonT,ColT,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[RowVector[CommonT,ValT],Matrix[CommonT,ColT,ValT],RowVector[ColT,ValT]] = | |
+ new MatrixProduct[RowVector[CommonT,ValT],Matrix[CommonT,ColT,ValT],RowVector[ColT,ValT]] { | |
+ def apply(left : RowVector[CommonT,ValT], right : Matrix[CommonT,ColT,ValT]) : RowVector[ColT,ValT] = { | |
+ val prod = (left.toMatrix(0) * right) : Matrix[Int,ColT,ValT] | |
+ new RowVector[ColT,ValT](prod.colSym, prod.valSym, prod.pipe.project(prod.colSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
+ implicit def colRowOuterProduct[IdxT,ValT](implicit ring : Ring[ValT]) : | |
+ MatrixProduct[ColVector[IdxT,ValT],RowVector[IdxT,ValT],Matrix[IdxT,IdxT,ValT]] = | |
+ new MatrixProduct[ColVector[IdxT,ValT],RowVector[IdxT,ValT],Matrix[IdxT,IdxT,ValT]] { | |
+ def apply(left : ColVector[IdxT,ValT], right : RowVector[IdxT,ValT]) : Matrix[IdxT,IdxT,ValT] = { | |
+ val prod = (left.toMatrix(0) * right.toMatrix(0)) : Matrix[IdxT,IdxT,ValT] | |
+ new Matrix[IdxT,IdxT,ValT](prod.rowSym, | |
+ prod.colSym, | |
+ prod.valSym, | |
+ prod.pipe.project(prod.rowSym, prod.colSym, prod.valSym)) | |
+ } | |
+ } | |
+ | |
implicit def standardMatrixProduct[RowL,Common,ColR,ValT](implicit ring : Ring[ValT]) : | |
MatrixProduct[Matrix[RowL,Common,ValT],Matrix[Common,ColR,ValT],Matrix[RowL,ColR,ValT]] = | |
new MatrixProduct[Matrix[RowL,Common,ValT],Matrix[Common,ColR,ValT],Matrix[RowL,ColR,ValT]] { | |
diff -ur twitter-scalding/src/test/scala/com/twitter/scalding/ArgTest.scala etsy-scalding/src/test/scala/com/twitter/scalding/ArgTest.scala | |
--- twitter-scalding/src/test/scala/com/twitter/scalding/ArgTest.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/test/scala/com/twitter/scalding/ArgTest.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -77,5 +77,13 @@ | |
a.list("b") must_== List("1", "-3", "4") | |
a("c").toInt must_== -5 | |
} | |
+ "handle k, v pairs separated by an equal sign" in { | |
+ val a = Args("a=1") | |
+ a("a") must be_==("1") | |
+ } | |
+ "handle multiple arguments when k, v pairs separated by an equal sign" in { | |
+ val a = Args("a=1 2 3") | |
+ a.list("a") must_== List("1", "2", "3") | |
+ } | |
} | |
} | |
diff -ur twitter-scalding/src/test/scala/com/twitter/scalding/CoreTest.scala etsy-scalding/src/test/scala/com/twitter/scalding/CoreTest.scala | |
--- twitter-scalding/src/test/scala/com/twitter/scalding/CoreTest.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/test/scala/com/twitter/scalding/CoreTest.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -35,6 +35,37 @@ | |
} | |
} | |
+object GroupRandomlyJob { | |
+ val NumShards = 10 | |
+} | |
+ | |
+class GroupRandomlyJob(args: Args) extends Job(args) { | |
+ import GroupRandomlyJob.NumShards | |
+ | |
+ Tsv("fakeInput") | |
+ .read | |
+ .mapTo(0 -> 'num) { (line: String) => line.toInt } | |
+ .groupRandomly(NumShards) { _.max('num) } | |
+ .groupAll { _.size } | |
+ .write(Tsv("fakeOutput")) | |
+} | |
+ | |
+class GroupRandomlyJobTest extends Specification with TupleConversions { | |
+ import GroupRandomlyJob.NumShards | |
+ noDetailedDiffs() | |
+ | |
+ "A GroupRandomlyJob" should { | |
+ val input = (0 to 10000).map { _.toString }.map { Tuple1(_) } | |
+ JobTest("com.twitter.scalding.GroupRandomlyJob") | |
+ .source(Tsv("fakeInput"), input) | |
+ .sink[(Int)](Tsv("fakeOutput")) { outBuf => | |
+ val numShards = outBuf(0) | |
+ numShards must be_==(NumShards) | |
+ } | |
+ .run.finish | |
+ } | |
+} | |
+ | |
class MapToGroupBySizeSumMaxJob(args: Args) extends Job(args) { | |
TextLine(args("input")).read. | |
//1 is the line | |
@@ -1151,3 +1182,36 @@ | |
} | |
} | |
+class ThrowsErrorsJob(args : Args) extends Job(args) { | |
+ Tsv("input",('letter, 'x)) | |
+ .read | |
+ .addTrap(Tsv("trapped")) | |
+ .map(('letter, 'x) -> 'yPrime){ fields : (String, Int) => | |
+ if (fields._2 == 1) throw new Exception("Erroneous Ones") else fields._2 } | |
+ .write(Tsv("output")) | |
+} | |
+ | |
+ | |
+class AddTrapTest extends Specification { | |
+ import Dsl._ | |
+ | |
+ noDetailedDiffs() //Fixes an issue with scala 2.9 | |
+ "An AddTrap" should { | |
+ val input = List(("a", 1),("b", 2), ("c", 3), ("d", 1), ("e", 2)) | |
+ | |
+ JobTest(new ThrowsErrorsJob(_)) | |
+ .source(Tsv("input",('letter,'x)), input) | |
+ .sink[(String, Int)](Tsv("output")) { outBuf => | |
+ "must contain all numbers in input except for 1" in { | |
+ outBuf.toList.sorted must be_==(List(("b", 2), ("c", 3), ("e", 2))) | |
+ } | |
+ } | |
+ .sink[(String, Int)](Tsv("trapped")) { outBuf => | |
+ "must contain all 1s and fields in input" in { | |
+ outBuf.toList.sorted must be_==(List(("a", 1), ("d", 1))) | |
+ } | |
+ } | |
+ .run | |
+ .finish | |
+ } | |
+} | |
Only in etsy-scalding/src/test/scala/com/twitter/scalding: InputTracingTest.scala | |
diff -ur twitter-scalding/src/test/scala/com/twitter/scalding/KryoTest.scala etsy-scalding/src/test/scala/com/twitter/scalding/KryoTest.scala | |
--- twitter-scalding/src/test/scala/com/twitter/scalding/KryoTest.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/test/scala/com/twitter/scalding/KryoTest.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -10,7 +10,7 @@ | |
import scala.collection.immutable.ListMap | |
import scala.collection.immutable.HashMap | |
-import com.twitter.algebird.{AveragedValue, DecayedValue, HLLInstance, | |
+import com.twitter.algebird.{AveragedValue, DecayedValue, | |
HyperLogLog, HyperLogLogMonoid, Moments, Monoid} | |
/* | |
Only in etsy-scalding/src/test/scala/com/twitter/scalding: SkewJoinTest.scala | |
diff -ur twitter-scalding/src/test/scala/com/twitter/scalding/TypedPipeTest.scala etsy-scalding/src/test/scala/com/twitter/scalding/TypedPipeTest.scala | |
--- twitter-scalding/src/test/scala/com/twitter/scalding/TypedPipeTest.scala 2013-05-29 23:08:00.000000000 -0700 | |
+++ etsy-scalding/src/test/scala/com/twitter/scalding/TypedPipeTest.scala 2013-05-29 23:08:00.000000000 -0700 | |
@@ -17,6 +17,7 @@ | |
.map { w => (w, 1L) } | |
.forceToDisk | |
.group | |
+ .forceToReducers | |
.sum | |
.write(Tsv("outputFile")) | |
} | |
@@ -43,9 +44,9 @@ | |
} | |
class TypedPipeJoinJob(args : Args) extends Job(args) { | |
- (Tsv("inputFile0").read.toTypedPipe[(Int,Int)](0, 1) | |
- leftJoin TypedPipe.from[(Int,Int)](Tsv("inputFile1").read, (0, 1))) | |
- .toPipe('key, 'value) | |
+ (Tsv("inputFile0").read.toTypedPipe[(Int,Int)](0, 1).group | |
+ leftJoin TypedPipe.from[(Int,Int)](Tsv("inputFile1").read, (0, 1)).group) | |
+ .toTypedPipe | |
.write(Tsv("outputFile")) | |
} | |
@@ -53,7 +54,7 @@ | |
noDetailedDiffs() //Fixes an issue with scala 2.9 | |
import Dsl._ | |
"A TypedPipeJoin" should { | |
- JobTest("com.twitter.scalding.TypedPipeJoinJob") | |
+ JobTest(new com.twitter.scalding.TypedPipeJoinJob(_)) | |
.source(Tsv("inputFile0"), List((0,0), (1,1), (2,2), (3,3), (4,5))) | |
.source(Tsv("inputFile1"), List((0,1), (1,2), (2,3), (3,4))) | |
.sink[(Int,(Int,Option[Int]))](Tsv("outputFile")){ outputBuffer => | |
@@ -73,19 +74,19 @@ | |
} | |
class TypedPipeHashJoinJob(args : Args) extends Job(args) { | |
- (Tsv("inputFile0").read.toTypedPipe[(Int,Int)](0, 1) | |
- hashLeftJoin TypedPipe.from[(Int,Int)](Tsv("inputFile1").read, (0, 1))) | |
- .toPipe('key, 'value) | |
+ TypedTsv[(Int,Int)]("inputFile0") | |
+ .group | |
+ .hashLeftJoin(TypedTsv[(Int,Int)]("inputFile1").group) | |
.write(Tsv("outputFile")) | |
} | |
class TypedPipeHashJoinTest extends Specification { | |
noDetailedDiffs() //Fixes an issue with scala 2.9 | |
import Dsl._ | |
- "A TypedPipeJoin" should { | |
- JobTest("com.twitter.scalding.TypedPipeJoinJob") | |
- .source(Tsv("inputFile0"), List((0,0), (1,1), (2,2), (3,3), (4,5))) | |
- .source(Tsv("inputFile1"), List((0,1), (1,2), (2,3), (3,4))) | |
+ "A TypedPipeHashJoinJob" should { | |
+ JobTest(new com.twitter.scalding.TypedPipeHashJoinJob(_)) | |
+ .source(TypedTsv[(Int,Int)]("inputFile0"), List((0,0), (1,1), (2,2), (3,3), (4,5))) | |
+ .source(TypedTsv[(Int,Int)]("inputFile1"), List((0,1), (1,2), (2,3), (3,4))) | |
.sink[(Int,(Int,Option[Int]))](Tsv("outputFile")){ outputBuffer => | |
val outMap = outputBuffer.toMap | |
"correctly join" in { | |
@@ -107,13 +108,15 @@ | |
TextLine("inputFile").read.typed(1 -> ('maxWord, 'maxCnt)) { tpipe : TypedPipe[String] => | |
tpipe.flatMap { _.split("\\s+") } | |
.map { w => (w, 1L) } | |
- // groupby the key and sum the values: | |
+ .group | |
.sum | |
.groupAll | |
+ // Looks like swap, but on the values in the grouping: | |
.mapValues { revTup _ } | |
+ .forceToReducers | |
.max | |
// Throw out the Unit key and reverse the value tuple | |
- .map { _._2 } | |
+ .values | |
.swap | |
}.write(Tsv("outputFile")) | |
} | |
@@ -137,27 +140,28 @@ | |
} | |
class TJoinCountJob(args : Args) extends Job(args) { | |
- (TypedPipe.from[(Int,Int)](Tsv("in0",(0,1)), (0,1)) | |
- join TypedPipe.from[(Int,Int)](Tsv("in1", (0,1)), (0,1))) | |
+ (TypedPipe.from[(Int,Int)](Tsv("in0",(0,1)), (0,1)).group | |
+ join TypedPipe.from[(Int,Int)](Tsv("in1", (0,1)), (0,1)).group) | |
.size | |
- .toPipe('key, 'count) | |
.write(Tsv("out")) | |
//Also check simple joins: | |
- (TypedPipe.from[(Int,Int)](Tsv("in0",(0,1)), (0,1)) | |
- join TypedPipe.from[(Int,Int)](Tsv("in1", (0,1)), (0,1))) | |
+ (TypedPipe.from[(Int,Int)](Tsv("in0",(0,1)), (0,1)).group | |
+ join TypedPipe.from[(Int,Int)](Tsv("in1", (0,1)), (0,1)).group) | |
//Flatten out to three values: | |
+ .toTypedPipe | |
.map { kvw => (kvw._1, kvw._2._1, kvw._2._2) } | |
- .write(('x, 'y, 'z), Tsv("out2")) | |
+ .write(Tsv("out2")) | |
//Also check simple leftJoins: | |
- (TypedPipe.from[(Int,Int)](Tsv("in0",(0,1)), (0,1)) | |
- leftJoin TypedPipe.from[(Int,Int)](Tsv("in1", (0,1)), (0,1))) | |
+ (TypedPipe.from[(Int,Int)](Tsv("in0",(0,1)), (0,1)).group | |
+ leftJoin TypedPipe.from[(Int,Int)](Tsv("in1", (0,1)), (0,1)).group) | |
//Flatten out to three values: | |
+ .toTypedPipe | |
.map { kvw : (Int,(Int,Option[Int])) => | |
(kvw._1, kvw._2._1, kvw._2._2.getOrElse(-1)) | |
} | |
- .write(('x, 'y, 'z), Tsv("out3")) | |
+ .write(Tsv("out3")) | |
} | |
class TypedPipeJoinCountTest extends Specification { | |
@@ -263,6 +267,7 @@ | |
pipe.flatMap { _.split("\\s+").map(_.toLowerCase) } | |
.groupBy(identity) | |
.mapValueStream(input => Iterator(input.size)) | |
+ .forceToReducers | |
} | |
val first = countWordsIn(TypedPipe.from(TextLine("in0"))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment