Skip to content

Instantly share code, notes, and snippets.

@mcfunley
Forked from anonymous/gist:5675979
Created May 30, 2013 06:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mcfunley/5675983 to your computer and use it in GitHub Desktop.
Save mcfunley/5675983 to your computer and use it in GitHub Desktop.
diff -ur twitter-scalding/README.md etsy-scalding/README.md
--- twitter-scalding/README.md 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/README.md 2013-05-29 23:08:00.000000000 -0700
@@ -4,7 +4,7 @@
Scalding is built on top of [Cascading](http://www.cascading.org/), a Java library that abstracts away much of the complexity of Hadoop.
-Current version: 0.8.0
+Current version: 0.8.1
## Word Count
@@ -51,7 +51,7 @@
We use [Travis CI](http://travis-ci.org/) to verify the build:
[![Build Status](https://secure.travis-ci.org/twitter/scalding.png)](http://travis-ci.org/twitter/scalding)
-The current version is 0.8.0 and is available from maven central: org="com.twitter", artifact="scalding_2.9.2".
+The current version is 0.8.1 and is available from maven central: org="com.twitter", artifact="scalding_2.9.2".
## Contact
Only in etsy-scalding: TRACING.md
diff -ur twitter-scalding/build.sbt etsy-scalding/build.sbt
--- twitter-scalding/build.sbt 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/build.sbt 2013-05-29 23:08:00.000000000 -0700
@@ -2,7 +2,7 @@
name := "scalding"
-version := "0.8.1"
+version := "0.8.2-SNAPSHOT"
organization := "com.twitter"
@@ -12,6 +12,8 @@
resolvers += "Concurrent Maven Repo" at "http://conjars.org/repo"
+
+
// Use ScalaCheck
resolvers ++= Seq(
"snapshots" at "http://oss.sonatype.org/content/repositories/snapshots",
@@ -20,18 +22,33 @@
//resolvers += "Twitter Artifactory" at "http://artifactory.local.twitter.com/libs-releases-local"
+// Uncomment and adjust this to use the local Etsy filesystem repository
+//etsyFSRepoPath := Some("/Users/mwalker/development/Ivy/repository")
+
+// This optionally adds the Etsy filesystem resolver
+resolvers <<= (resolvers, etsyFSRepoPath) { (rs, p) =>
+ optionalEtsyResolver("etsy-fs-repo-resolver", p)
+ .map(rs :+ _)
+ .getOrElse(rs)
+}
+
+// This optionally points the publish target at the Etsy filesystem resolver
+publishTo <<= (etsyFSRepoPath) { p => optionalEtsyResolver("etsy-fs-repo-publish", p) }
+
libraryDependencies += "cascading" % "cascading-core" % "2.0.2"
libraryDependencies += "cascading" % "cascading-local" % "2.0.2"
libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.2"
-libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.4.5"
-
-libraryDependencies += "com.twitter" % "meat-locker" % "0.3.1"
+libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.4.6"
libraryDependencies += "com.twitter" % "maple" % "0.2.4"
+libraryDependencies += "com.twitter" % "algebird-core_2.9.2" % "0.1.12"
+
+libraryDependencies += "com.twitter" % "chill_2.9.2" % "0.2.0"
+
libraryDependencies += "commons-lang" % "commons-lang" % "2.4"
libraryDependencies += "org.scala-tools.testing" % "specs_2.8.1" % "1.6.6" % "test"
@@ -40,6 +57,8 @@
libraryDependencies += "io.backchat.jerkson" % "jerkson_2.9.2" % "0.7.0"
+libraryDependencies += "com.googlecode.javaewah" % "JavaEWAH" % "0.6.6"
+
libraryDependencies ++= Seq(
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"org.scala-tools.testing" % "specs_2.9.0-1" % "1.6.8" % "test"
diff -ur twitter-scalding/project/Build.scala etsy-scalding/project/Build.scala
--- twitter-scalding/project/Build.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/project/Build.scala 2013-05-29 23:08:00.000000000 -0700
@@ -1,7 +1,49 @@
import sbt._
object ScaldingBuild extends Build {
- lazy val root = Project("root", file("."))
- .dependsOn(RootProject(uri("git://github.com/twitter/algebird.git#master")))
- .dependsOn(RootProject(uri("git://github.com/twitter/chill.git#master")))
+ /**
+ * An optional path to a local filesystem repository to add to the list of
+ * resolvers. This defaults to None, in which case it is not used. To set a
+ * path, place:
+ *
+ * etsyFSRepoPath := "your/path/here"
+ *
+ * in build.sbt or use set in the sbt console:
+ *
+ * > set etsyFSRepoPath := Some("/Users/mwalker/development/Ivy/repository")
+ * [info] Defining *:etsy-fs-repo-path
+ * [info] The new value will be used by *:publish-to, *:resolvers
+ * [info] Reapplying settings...
+ * [info] Set current project to scalding (in build file:/Users/mwalker/development/mrwalker/scalding/)
+ */
+ val etsyFSRepoPath = SettingKey[Option[String]](
+ "etsy-fs-repo-path",
+ "Path to the local Etsy filesystem repository"
+ )
+
+ lazy val root = Project(
+ "root",
+ file("."),
+ settings = Project.defaultSettings ++ Seq(etsyFSRepoPath := None)
+ )
+
+ /**
+ * Optionally create a resolver configured for the Etsy repository.
+ */
+ def optionalEtsyResolver(name: String, path: Option[String]): Option[sbt.Resolver] = path.map(p =>
+ Resolver.file(name, file(p))(
+ Patterns(
+ Seq(
+ "[organisation]/[module]/[revision]/ivy-[revision].xml",
+ "[organisation]/[module]/[revision]/ivys/ivy.xml"
+ ),
+ Seq(
+ "[organisation]/[module]/[revision]/[type]s/[artifact]-[revision].[ext]",
+ "[organisation]/[module]/[revision]/[type]s/[artifact].[ext]",
+ "[organisation]/[module]/[revision]/[type]s/[artifact]-[classifier].[ext]"
+ ),
+ false
+ )
+ )
+ )
}
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/Args.scala etsy-scalding/src/main/scala/com/twitter/scalding/Args.scala
--- twitter-scalding/src/main/scala/com/twitter/scalding/Args.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/main/scala/com/twitter/scalding/Args.scala 2013-05-29 23:08:00.000000000 -0700
@@ -27,6 +27,7 @@
def apply(argString : String) : Args = Args(argString.split("\\s+"))
/**
* parses keys as starting with a dash, except single dashed digits.
+ * Also parses key value pairs that are separated with an equal sign.
* All following non-dashed args are a list of values.
* If the list starts with non-dashed args, these are associated with the
* empty string: ""
@@ -39,7 +40,10 @@
.filter{ a => !a.matches("\\s*") }
.foldLeft(List("" -> List[String]())) { (acc, arg) =>
val noDashes = arg.dropWhile{ _ == '-'}
- if(arg == noDashes || isNumber(arg))
+ if (arg.contains("=")) {
+ val splitArg = arg.split("=")
+ (splitArg(0) -> List(splitArg(1))) :: acc
+ } else if(arg == noDashes || isNumber(arg))
(acc.head._1 -> (arg :: acc.head._2)) :: acc.tail
else
(noDashes -> List()) :: acc
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/FileSource.scala etsy-scalding/src/main/scala/com/twitter/scalding/FileSource.scala
--- twitter-scalding/src/main/scala/com/twitter/scalding/FileSource.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/main/scala/com/twitter/scalding/FileSource.scala 2013-05-29 23:08:00.000000000 -0700
@@ -68,7 +68,7 @@
override def createTap(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_,_,_] = {
mode match {
// TODO support strict in Local
- case Local(_) => {
+ case Local(_,_) => {
val sinkmode = readOrWrite match {
case Read => SinkMode.KEEP
case Write => SinkMode.REPLACE
@@ -175,11 +175,13 @@
val separator = "\t"
val skipHeader = false
val writeHeader = false
+ val quote : String = null
//These should not be changed:
- override def localScheme = new CLTextDelimited(fields, skipHeader, writeHeader, separator, types)
+ override def localScheme = new CLTextDelimited(fields, skipHeader, writeHeader, separator, quote, types)
- override def hdfsScheme =
- HadoopSchemeInstance(new CHTextDelimited(fields, skipHeader, writeHeader, separator, types))
+ override def hdfsScheme = {
+ HadoopSchemeInstance(new CHTextDelimited(fields, skipHeader, writeHeader, separator, quote, types))
+ }
}
trait SequenceFileScheme extends Source {
@@ -236,6 +238,17 @@
override val skipHeader : Boolean = false, override val writeHeader: Boolean = false) extends FixedPathSource(p)
with DelimitedScheme
+/**
+* Csv value source
+* separated by commas and quotes wrapping all fields
+*/
+case class Csv(p : String,
+ override val separator : String = ",",
+ override val fields : Fields = Fields.ALL,
+ override val skipHeader : Boolean = false,
+ override val writeHeader : Boolean = false,
+ override val quote : String ="\"") extends FixedPathSource(p) with DelimitedScheme
+
/** Allows you to set the types, prefer this:
* If T is a subclass of Product, we assume it is a tuple. If it is not, wrap T in a Tuple1:
* e.g. TypedTsv[Tuple1[List[Int]]]
@@ -262,13 +275,13 @@
// For Mappable:
override def mapTo[U](out : Fields)(fun : (T) => U)
- (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U]) = {
- RichPipe(read(flowDef, mode)).mapTo[T,U](sourceFields -> out)(fun)(converter, setter)
+ (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U], tracing : Tracing) = {
+ RichPipe(read(flowDef, mode, tracing)).mapTo[T,U](sourceFields -> out)(fun)(converter, setter, tracing)
}
// For Mappable:
override def flatMapTo[U](out : Fields)(fun : (T) => Iterable[U])
- (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U]) = {
- RichPipe(read(flowDef, mode)).flatMapTo[T,U](sourceFields -> out)(fun)(converter, setter)
+ (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U], tracing : Tracing) = {
+ RichPipe(read(flowDef, mode, tracing)).flatMapTo[T,U](sourceFields -> out)(fun)(converter, setter, tracing)
}
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/GroupBuilder.scala etsy-scalding/src/main/scala/com/twitter/scalding/GroupBuilder.scala
--- twitter-scalding/src/main/scala/com/twitter/scalding/GroupBuilder.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/main/scala/com/twitter/scalding/GroupBuilder.scala 2013-05-29 23:08:00.000000000 -0700
@@ -317,6 +317,12 @@
* of group operations similar to `RichPipe.then`
*/
def then(fn : (GroupBuilder) => GroupBuilder) = fn(this)
+
+ /**
+ * An identity function that keeps all the tuples. A hack to implement
+ * groupAll and groupRandomly.
+ */
+ def pass : GroupBuilder = takeWhile(0) { (t: TupleEntry) => true }
}
/**
Only in etsy-scalding/src/main/scala/com/twitter/scalding: InputTracingJob.scala
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/IterableSource.scala etsy-scalding/src/main/scala/com/twitter/scalding/IterableSource.scala
--- twitter-scalding/src/main/scala/com/twitter/scalding/IterableSource.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/main/scala/com/twitter/scalding/IterableSource.scala 2013-05-29 23:08:00.000000000 -0700
@@ -72,7 +72,7 @@
sys.error("IterableSource is a Read-only Source")
}
mode match {
- case Local(_) => new MemoryTap[InputStream,OutputStream](localScheme, asBuffer)
+ case Local(_,_) => new MemoryTap[InputStream,OutputStream](localScheme, asBuffer)
case Test(_) => new MemoryTap[InputStream,OutputStream](localScheme, asBuffer)
case Hdfs(_, _) => hdfsTap
case HadoopTest(_,_) => hdfsTap
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/Job.scala etsy-scalding/src/main/scala/com/twitter/scalding/Job.scala
--- twitter-scalding/src/main/scala/com/twitter/scalding/Job.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/main/scala/com/twitter/scalding/Job.scala 2013-05-29 23:08:00.000000000 -0700
@@ -142,7 +142,7 @@
//Largely for the benefit of Java jobs
implicit def read(src : Source) : Pipe = src.read
- def write(pipe : Pipe, src : Source) {src.write(pipe)}
+ def write(pipe : Pipe, src : Source) {src.writeFrom(pipe)}
def validateSources(mode : Mode) {
flowDef.getSources()
@@ -175,8 +175,19 @@
case Some(tzn) => java.util.TimeZone.getTimeZone(tzn)
case None => defaultTimeZone
}
+
+ // Optionally take a --period, which determines how many days each job runs over (rather
+ // than over the whole date range)
+ // --daily and --weekly are aliases for --period 1 and --period 7 respectively
+ val period =
+ if (args.boolean("daily"))
+ 1
+ else if (args.boolean("weekly"))
+ 7
+ else
+ args.getOrElse("period", "0").toInt
- implicit lazy val dateRange = {
+ lazy val (startDate, endDate) = {
val (start, end) = args.list("date") match {
case List(s, e) => (RichDate(s), RichDate.upperBound(e))
case List(o) => (RichDate(o), RichDate.upperBound(o))
@@ -184,8 +195,21 @@
}
//Make sure the end is not before the beginning:
assert(start <= end, "end of date range must occur after the start")
- DateRange(start, end)
+ (start, end)
}
+
+ implicit lazy val dateRange = DateRange(startDate, if (period > 0) startDate + Days(period) - Millisecs(1) else endDate)
+
+ override def next : Option[Job] =
+ if (period > 0) {
+ val nextStartDate = startDate + Days(period)
+ if (nextStartDate + Days(period - 1) > endDate)
+ None // we're done
+ else // return a new job with the new startDate
+ Some(clone(args + ("date" -> List(nextStartDate.toString("yyyy-MM-dd"), endDate.toString("yyyy-MM-dd")))))
+ }
+ else
+ None
}
// DefaultDateRangeJob with default time zone as UTC instead of Pacific.
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala etsy-scalding/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala
--- twitter-scalding/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala 2013-05-29 23:08:00.000000000 -0700
@@ -1,4 +1,5 @@
/*
+
Copyright 2012 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
@@ -27,6 +28,9 @@
import cascading.tuple._
import cascading.cascade._
+import com.twitter.algebird._
+import com.twitter.algebird.Operators._
+
import scala.util.Random
/*
@@ -116,6 +120,29 @@
}
}
+ def prepareJoin(p : Pipe, left : Boolean)(implicit tracing : Tracing) : Pipe = {
+ tracing.beforeJoin(p, left)
+ }
+
+ def finishJoin(p : Pipe)(implicit tracing : Tracing) : Pipe = {
+ tracing.afterJoin(p)
+ }
+
+ def filterAndJoinWithSmaller(fs : (Fields, Fields), that : Pipe, small_pipe_size : Int = 10000, false_pos_rate : Double = 0.1) : Pipe = {
+ val tracing : Tracing = Tracing.tracing
+ Tracing.tracing = new NullTracing
+ // Make a bloom filter of the smaller pipe contents.
+ implicit val bfm : BloomFilterMonoid = BloomFilter(small_pipe_size, false_pos_rate)
+ val bfp = that.mapTo(fs._2 -> '__bf){ x : TupleEntry => bfm.create(x.getTuple.toString) }
+ .groupAll{ _.plus[BF]('__bf -> '__bf) }
+ val fp = pipe.map(fs._1 -> '__str){ x : TupleEntry => x.getTuple.toString }
+ .crossWithTiny(bfp)
+ .filter('__str, '__bf) { x : (String, BF) => x._2.contains(x._1) != ApproximateBoolean.exactFalse }
+ .discard('__str, '__bf)
+ Tracing.tracing = tracing
+ fp.joinWithSmaller(fs, that)
+ }
+
/**
* Joins the first set of keys in the first pipe to the second set of keys in the second pipe.
* All keys must be unique UNLESS it is an inner join, then duplicated join keys are allowed, but
@@ -136,10 +163,10 @@
val intersection = asSet(fs._1).intersect(asSet(fs._2))
if (intersection.size == 0) {
// Common case: no intersection in names: just CoGroup, which duplicates the grouping fields:
- pipe.coGroupBy(fs._1, joiners._1) {
- _.coGroup(fs._2, that, joiners._2)
+ finishJoin(prepareJoin(pipe, false).coGroupBy(fs._1, joiners._1) {
+ _.coGroup(fs._2, prepareJoin(that, true), joiners._2)
.reducers(reducers)
- }
+ })
}
else if (joiners._1 == InnerJoinMode && joiners._2 == InnerJoinMode) {
/*
@@ -148,10 +175,10 @@
* So, we rename the right hand side to temporary names, then discard them after the operation
*/
val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection)
- pipe.coGroupBy(fs._1, joiners._1) {
- _.coGroup(newJoinFields, renamedThat, joiners._2)
+ finishJoin(prepareJoin(pipe,false).coGroupBy(fs._1, joiners._1) {
+ _.coGroup(newJoinFields, prepareJoin(renamedThat, true), joiners._2)
.reducers(reducers)
- }.discard(temp)
+ }.discard(temp))
}
else {
throw new IllegalArgumentException("join keys must be disjoint unless you are doing an InnerJoin. Found: " +
@@ -186,19 +213,20 @@
*/
def joinWithTiny(fs :(Fields,Fields), that : Pipe) = {
val intersection = asSet(fs._1).intersect(asSet(fs._2))
+ val pt = prepareJoin(that, true)
if (intersection.size == 0) {
- new HashJoin(assignName(pipe), fs._1, assignName(that), fs._2, new InnerJoin)
+ finishJoin(new HashJoin(assignName(prepareJoin(pipe, false)), fs._1, assignName(pt), fs._2, new InnerJoin))
}
else {
- val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection)
- (new HashJoin(assignName(pipe), fs._1, assignName(renamedThat), newJoinFields, new InnerJoin))
- .discard(temp)
+ val (renamedThat, newJoinFields, temp) = renameCollidingFields(pt, fs._2, intersection)
+ finishJoin((new HashJoin(assignName(prepareJoin(pipe, false)), fs._1, assignName(renamedThat), newJoinFields, new InnerJoin))
+ .discard(temp))
}
}
def leftJoinWithTiny(fs :(Fields,Fields), that : Pipe) = {
//Rename these pipes to avoid cascading name conflicts
- new HashJoin(assignName(pipe), fs._1, assignName(that), fs._2, new LeftJoin)
+ finishJoin(new HashJoin(assignName(prepareJoin(pipe, false)), fs._1, assignName(prepareJoin(that, true)), fs._2, new LeftJoin))
}
/**
@@ -240,31 +268,60 @@
val leftFields = new Fields("__LEFT_I__", "__LEFT_J__")
val rightFields = new Fields("__RIGHT_I__", "__RIGHT_J__")
- // Add the new dummy fields
- val newLeft = addDummyFields(pipe, leftFields, rightReplication, leftReplication)
- val newRight = addDummyFields(otherPipe, rightFields, leftReplication, rightReplication, swap = true)
+ // Add the new dummy replication fields
+ val newLeft = addReplicationFields(prepareJoin(pipe, false), leftFields, leftReplication, rightReplication)
+ val newRight = addReplicationFields(prepareJoin(otherPipe, true), rightFields, rightReplication, leftReplication, swap = true)
val leftJoinFields = Fields.join(fs._1, leftFields)
val rightJoinFields = Fields.join(fs._2, rightFields)
- newLeft
+ finishJoin(newLeft
.joinWithSmaller((leftJoinFields, rightJoinFields), newRight, joiner, reducers)
.discard(leftFields)
- .discard(rightFields)
+ .discard(rightFields))
}
/**
* Adds one random field and one replica field.
*/
- private def addDummyFields(p : Pipe, f : Fields, k1 : Int, k2 : Int, swap : Boolean = false) : Pipe = {
- p.flatMap(() -> f) { u : Unit =>
- val i = if(k1 == 1 ) 0 else (new Random()).nextInt(k1 - 1)
- (0 until k2).map{ j =>
- if(swap) (j, i) else (i, j)
- }
+ private def addReplicationFields(p : Pipe, f : Fields,
+ replication : Int, otherReplication : Int, swap : Boolean = false) : Pipe = {
+
+ p.using(new Random with Stateful).flatMap(() -> f) { (rand : Random, _ : Unit) =>
+ val rfs = getReplicationFields(rand, replication, otherReplication)
+ if (swap) rfs.map { case(i, j) => (j, i) } else rfs
}
}
+ /**
+ * Returns a list of the dummy replication fields used to replicate groups in skewed joins.
+ *
+ * For example, suppose you have two pipes P1 and P2. While performing a skewed join for a particular
+ * key K, you want to replicate every row in P1 with this key 3 times, and every row in P2 with this
+ * key 5 times.
+ *
+ * Then:
+ *
+ * - For the P1 replication, the first element of each tuple is the same random integer between 0 and 4,
+ * and the second element of each tuple is the index of the replication (between 0 and 2). This first
+ * random element guarantees that we will match exactly one random row in P2 with the same key.
+ * - Similarly for the P2 replication.
+ *
+ * Examples:
+ *
+ * getReplicationFields(3, 5)
+ * => List( (1, 0), (1, 1), (1, 2) )
+ *
+ * getReplicationFields(5, 3)
+ * => List( (2, 0), (2, 1), (2, 2), (2, 3), (2, 4) )
+ */
+ private def getReplicationFields(r : Random, replication : Int, otherReplication : Int) : IndexedSeq[(Int, Int)] = {
+ assert(replication >= 1 && otherReplication >= 1, "Replication counts must be >= 1")
+
+ val rand = r.nextInt(otherReplication)
+ (0 until replication).map { rep => (rand, rep) }
+ }
+
private def assertValidJoinMode(joiner : Joiner, left : Int, right : Int) {
(joiner, left, right) match {
case (i : InnerJoin, _, _) => true
@@ -276,6 +333,108 @@
)
}
}
+
+ /**
+ * Performs a skewed join, which is useful when the data has extreme skew.
+ *
+ * For example, imagine joining a pipe of Twitter's follow graph against a pipe of user genders,
+ * in order to find the gender distribution of the accounts every Twitter user follows. Since celebrities
+ * (e.g., Justin Bieber and Lady Gaga) have a much larger follower base than other users, and (under
+ * a standard join algorithm) all their followers get sent to the same reducer, the job will likely be
+ * stuck on a few reducers for a long time. A skewed join attempts to alleviate this problem.
+ *
+ * This works as follows:
+ *
+ * 1. First, we sample from the left and right pipes with some small probability, in order to determine
+ * approximately how often each join key appears in each pipe.
+ * 2. We use these estimated counts to replicate the join keys, according to the given replication strategy.
+ * 3. Finally, we join the replicated pipes together.
+ *
+ * @param sampleRate This controls how often we sample from the left and right pipes when estimating key counts.
+ * @param replicator Algorithm for determining how much to replicate a join key in the left and right pipes.
+ *
+ * Note: since we do not set the replication counts, only inner joins are allowed. (Otherwise, replicated
+ * rows would stay replicated when there is no counterpart in the other pipe.)
+ */
+ def skewJoinWithSmaller(fs : (Fields, Fields), otherPipe : Pipe,
+ sampleRate : Double = 0.001, reducers : Int = -1,
+ replicator : SkewReplication = SkewReplicationA()) : Pipe = {
+
+ assert(sampleRate > 0 && sampleRate < 1, "Sampling rate for skew joins must lie strictly between 0 and 1")
+ // This assertion could be avoided, but since this function calls outer joins and left joins,
+ // we assume it to avoid renaming pain.
+ assert(fs._1.iterator.toList.intersect(fs._2.iterator.toList).isEmpty, "Join keys in a skew join must be disjoint")
+
+ // 1. First, get an approximate count of the left join keys and the right join keys, so that we
+ // know how much to replicate.
+ // TODO: try replacing this with a Count-Min sketch.
+ val leftSampledCountField = "__LEFT_SAMPLED_COUNT__"
+ val rightSampledCountField = "__RIGHT_SAMPLED_COUNT__"
+ val sampledCountFields = new Fields(leftSampledCountField, rightSampledCountField)
+
+ val sampledLeft = pipe.filter() { u : Unit => scala.math.random < sampleRate }
+ .groupBy(fs._1) { _.size(leftSampledCountField) }
+ val sampledRight = otherPipe.filter() { u : Unit => scala.math.random < sampleRate }
+ .groupBy(fs._2) { _.size(rightSampledCountField) }
+
+ val sampledCounts = sampledLeft.joinWithSmaller(fs._1 -> fs._2, sampledRight, joiner = new OuterJoin)
+ .project(Fields.join(fs._1, fs._2, sampledCountFields))
+
+ // 2. Now replicate each group of join keys in the left and right pipes, according to the sampled counts
+ // from the previous step.
+ val leftReplicationFields = new Fields("__LEFT_RAND__", "__LEFT_REP__")
+ val rightReplicationFields = new Fields("__RIGHT_REP__", "__RIGHT_RAND__")
+
+ val replicatedLeft = skewReplicate(pipe, sampledCounts, fs._1, sampledCountFields, leftReplicationFields,
+ replicator, reducers)
+ val replicatedRight = skewReplicate(otherPipe, sampledCounts, fs._2, sampledCountFields, rightReplicationFields,
+ replicator, reducers, true)
+
+ // 3. Finally, join the replicated pipes together.
+ val leftJoinFields = Fields.join(fs._1, leftReplicationFields)
+ val rightJoinFields = Fields.join(fs._2, rightReplicationFields)
+
+ replicatedLeft
+ .joinWithSmaller(leftJoinFields -> rightJoinFields, replicatedRight, joiner = new InnerJoin, reducers)
+ .discard(leftReplicationFields)
+ .discard(rightReplicationFields)
+ }
+
+ /**
+ * Helper method for performing skewed joins. This replicates the rows in {pipe} according
+ * to the estimated counts in {sampledCounts}.
+ *
+ * @param pipe The pipe to be replicated.
+ * @param sampledCounts A pipe containing, for each key, the estimated counts of how often
+ * this key appeared in the samples of the original left and right pipes.
+ * @param replicator Strategy for how the pipe is replicated.
+ * @param isPipeOnRight Set to true when replicating the right pipe.
+ */
+ private def skewReplicate(pipe : Pipe, sampledCounts : Pipe, joinFields : Fields,
+ countFields : Fields, replicationFields : Fields,
+ replicator : SkewReplication,
+ numReducers : Int = -1, isPipeOnRight : Boolean = false) = {
+
+ // Rename the fields to prepare for the leftJoin below.
+ val renamedFields = joinFields.iterator.toList.map { field => "__RENAMED_" + field + "__" }
+ val renamedSampledCounts = sampledCounts.rename(joinFields -> renamedFields)
+ .project(Fields.join(renamedFields, countFields))
+
+ pipe
+ // Join the pipe against the sampled counts, so that we know approximately how often each
+ // join key appears.
+ .leftJoinWithTiny(joinFields -> renamedFields, renamedSampledCounts)
+ .using(new Random with Stateful)
+ .flatMap(countFields -> replicationFields) { (rand : Random, counts : (Int, Int)) =>
+ val (leftRep, rightRep) = replicator.getReplications(counts._1, counts._2, numReducers)
+
+ val (rep, otherRep) = if (isPipeOnRight) (rightRep, leftRep) else (leftRep, rightRep)
+ val rfs = getReplicationFields(rand, rep, otherRep)
+ if (isPipeOnRight) rfs.map { case (i, j) => (j, i) } else rfs
+ }
+ .discard(renamedFields)
+ .discard(countFields)
+ }
}
class InvalidJoinModeException(args : String) extends Exception(args)
Only in etsy-scalding/src/main/scala/com/twitter/scalding: Matrix.scala
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/Mode.scala etsy-scalding/src/main/scala/com/twitter/scalding/Mode.scala
--- twitter-scalding/src/main/scala/com/twitter/scalding/Mode.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/main/scala/com/twitter/scalding/Mode.scala 2013-05-29 23:08:00.000000000 -0700
@@ -43,7 +43,9 @@
/**
* This mode is used by default by sources in read and write
*/
- implicit var mode : Mode = Local(false)
+ def emptyConf = { val c = new Configuration; c.clear; c }
+
+ implicit var mode : Mode = Local(false, emptyConf)
// This should be passed ALL the args supplied after the job name
def apply(args : Args, config : Configuration) : Mode = {
@@ -54,7 +56,7 @@
}
if (args.boolean("local"))
- Local(strictSources)
+ Local(strictSources, config)
else if (args.boolean("hdfs"))
Hdfs(strictSources, config)
else
@@ -65,14 +67,20 @@
* There are three ways to run jobs
* sourceStrictness is set to true
*/
-abstract class Mode(val sourceStrictness : Boolean) {
+abstract class Mode(val sourceStrictness : Boolean, val jobConf : Configuration) {
+
// We can't name two different pipes with the same name.
// NOTE: there is a subtle bug in scala regarding case classes
// with multiple sets of arguments, and their equality.
// For this reason, we use Source.toString as the key in this map
protected val sourceMap = MMap[String, (Source, Pipe)]()
- def config = Map[AnyRef,AnyRef]()
+ def config : Map[AnyRef,AnyRef] = {
+ jobConf.foldLeft(Map[AnyRef, AnyRef]()) {
+ (acc, kv) => acc + ((kv.getKey, kv.getValue))
+ }
+ }
+
def newFlowConnector(props : Map[AnyRef,AnyRef]) : FlowConnector
/*
@@ -107,14 +115,6 @@
trait HadoopMode extends Mode {
- def jobConf : Configuration
-
- override def config = {
- jobConf.foldLeft(Map[AnyRef, AnyRef]()) {
- (acc, kv) => acc + ((kv.getKey, kv.getValue))
- }
- }
-
override def newFlowConnector(props : Map[AnyRef,AnyRef]) = {
new HadoopFlowConnector(props)
}
@@ -144,21 +144,18 @@
override def fileExists(filename : String) : Boolean = fileSet.contains(filename)
}
-case class Hdfs(strict : Boolean, conf : Configuration) extends Mode(strict) with HadoopMode {
- override def jobConf = conf
+case class Hdfs(strict : Boolean, conf : Configuration) extends Mode(strict, conf) with HadoopMode {
override def fileExists(filename : String) : Boolean =
FileSystem.get(jobConf).exists(new Path(filename))
}
case class HadoopTest(conf : Configuration, buffers : Map[Source,Buffer[Tuple]])
- extends Mode(false) with HadoopMode with TestMode {
+ extends Mode(false, conf) with HadoopMode with TestMode {
// This is a map from source.toString to disk path
private val writePaths = MMap[Source, String]()
private val allPaths = MSet[String]()
- override def jobConf = conf
-
@tailrec
private def allocateNewPath(prefix : String, idx : Int) : String = {
val candidate = prefix + idx.toString
@@ -196,12 +193,12 @@
}
}
-case class Local(strict : Boolean) extends Mode(strict) with CascadingLocal {
+case class Local(strict : Boolean, conf : Configuration) extends Mode(strict, conf) with CascadingLocal {
override def fileExists(filename : String) : Boolean = new File(filename).exists
}
/**
* Memory only testing for unit tests
*/
-case class Test(val buffers : Map[Source,Buffer[Tuple]]) extends Mode(false)
+case class Test(val buffers : Map[Source,Buffer[Tuple]]) extends Mode(false, new Configuration)
with TestMode with CascadingLocal
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/ReduceOperations.scala etsy-scalding/src/main/scala/com/twitter/scalding/ReduceOperations.scala
--- twitter-scalding/src/main/scala/com/twitter/scalding/ReduceOperations.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/main/scala/com/twitter/scalding/ReduceOperations.scala 2013-05-29 23:08:00.000000000 -0700
@@ -21,7 +21,7 @@
import com.twitter.algebird.{
Monoid,
Ring,
- AveragedValue,
+ Averager,
Moments,
SortedTakeListMonoid,
HyperLogLogMonoid,
@@ -73,7 +73,7 @@
* == Similar To ==
* <a href="http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm">http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm</a>
*/
- def average(f : (Fields, Fields)) = mapPlusMap(f) { (x : Double) => AveragedValue(1L, x) } { _.value }
+ def average(f : (Fields, Fields)) = aggregate(f)(Averager)
def average(f : Symbol) : Self = average(f->f)
/**
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/RichPipe.scala etsy-scalding/src/main/scala/com/twitter/scalding/RichPipe.scala
--- twitter-scalding/src/main/scala/com/twitter/scalding/RichPipe.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/main/scala/com/twitter/scalding/RichPipe.scala 2013-05-29 23:08:00.000000000 -0700
@@ -27,6 +27,10 @@
import cascading.tuple._
import cascading.cascade._
+import scala.util.Random
+
+import java.security.MessageDigest
+
object RichPipe extends java.io.Serializable {
private var nextPipe = -1
@@ -55,19 +59,22 @@
}
}
+object MD5Hash extends java.io.Serializable {
+ private val hash = MessageDigest.getInstance("MD5")
+ def apply(s : String) : Int = {
+ hash.reset
+ val h : Array[Byte] = hash.digest(s.getBytes)
+ (h(0).toInt << 24) + (h(1).toInt << 16) + (h(2).toInt << 8) + h(3).toInt
+ }
+}
+
class RichPipe(val pipe : Pipe) extends java.io.Serializable with JoinAlgorithms {
+
// We need this for the implicits
import Dsl._
import RichPipe.assignName
/**
- * A simple trait for releasable resource. Provides noop implementation.
- */
- trait Stateful {
- def release() {}
- }
-
- /**
* Rename the current pipe
*/
def name(s : String) = new Pipe(s, pipe)
@@ -89,7 +96,7 @@
def apply(c: C) { c.release() }
},
Fields.NONE, conv, set))
- NullSource.write(newPipe)(flowDef, mode)
+ NullSource.writeFrom(newPipe)(flowDef, mode)
newPipe
}
@@ -129,8 +136,16 @@
* takes any number of parameters as long as we can convert
* them to a fields object
*/
- def project(fields : Fields) = {
- new Each(pipe, fields, new Identity(fields))
+ def project(fields : Fields)(implicit tracing : Tracing) = {
+ tracing.tracingFields match {
+ case Some(tracingFields) => {
+ val f = if(tracing.isTraced(pipe) && !fields.contains(tracingFields)) fields.append(tracingFields) else fields
+ new Each(pipe, f, new Identity(f))
+ }
+ case None => {
+ new Each(pipe, fields, new Identity(fields))
+ }
+ }
}
/**
@@ -156,14 +171,18 @@
* _.size.max('f1) etc...
* }}}
*/
- def groupBy(f : Fields)(builder : GroupBuilder => GroupBuilder) : Pipe = {
- builder(new GroupBuilder(f)).schedule(pipe.getName, pipe)
+ def groupBy(f : Fields)(builder : GroupBuilder => GroupBuilder)(implicit tracing : Tracing) : Pipe = {
+ tracing.onGroupBy(builder(new GroupBuilder(f)), pipe).schedule(pipe.getName, pipe)
+ }
+
+ def groupByNoMerge(f : Fields)(builder : GroupBuilder => GroupBuilder)(implicit tracing : Tracing) : Pipe = {
+ tracing.onGroupByNoMerge(builder(new GroupBuilder(f)), pipe).schedule(pipe.getName, pipe)
}
/**
* Returns the set of unique tuples containing the specified fields
*/
- def unique(f : Fields) : Pipe = groupBy(f) { _.size('__uniquecount__) }.project(f)
+ def unique(f : Fields) : Pipe = groupByNoMerge(f) { _.size('__uniquecount__) }.project(f)
/**
* Merge or Concatenate several pipes together with this one:
@@ -176,9 +195,7 @@
* This is probably only useful just before setting a tail such as Database
* tail, so that only one reducer talks to the DB. Kind of a hack.
*/
- def groupAll : Pipe = groupAll { g =>
- g.takeWhile(0)((t : TupleEntry) => true)
- }
+ def groupAll : Pipe = groupAll { _.pass }
/**
* == Warning ==
@@ -196,6 +213,29 @@
.discard('__groupAll__)
}
+ def shard(n : Int) : Pipe = groupRandomly(n) { _.pass }
+
+ /**
+ * Like groupAll, but randomly groups data into n reducers.
+ */
+ def groupRandomly(n : Int)(gs: GroupBuilder => GroupBuilder)(implicit tracing : Tracing) : Pipe = {
+ using(new Random with Stateful)
+ .map(()->'__shard__) { (r:Random, _:Unit) => r.nextInt(n) }
+ .groupBy('__shard__) { g : GroupBuilder => gs(g).reducers(n) } (tracing)
+ .discard('__shard__)
+ }
+
+ /**
+ * Adds a field with a constant value.
+ *
+ * == Usage ==
+ * {{{
+ * insert('a, 1)
+ * }}}
+ */
+ def insert[A](fs : Fields, value : A)(implicit conv : TupleSetter[A]) : Pipe =
+ map(() -> fs) { _:Unit => value }
+
/**
* Rename some set of N fields as another set of N fields
*
@@ -226,6 +266,26 @@
}
/**
+ * Subsample the entries in the pipe.
+ * Hash some fields rather than use random noise, for the sake of reproduceability.
+ */
+ def subsample(p : Double)(implicit tracing : Tracing) : Pipe = {
+ subsample(Fields.ALL, p)
+ }
+
+ def subsample(fields : Fields, p : Double)(implicit tracing : Tracing) : Pipe = {
+ val q : Long = math.round(1.0/p)
+ if(tracing.isTraced(pipe)) {
+ tracing.tracingFields match {
+ case Some(tf) => filter[TupleEntry](fields){ t : TupleEntry => val s = new Tuple(t.getTuple); if(t.getFields.contains(tf)) s.remove(t.getFields, tf); MD5Hash(s.toString) % q == 0 }
+ case _ => filter[TupleEntry](fields, p){ t : TupleEntry => MD5Hash(t.getTuple.toString) %q == 0}
+ }
+ } else {
+ filter[TupleEntry](fields, p){ t : TupleEntry => MD5Hash(t.getTuple.toString) % q == 0}
+ }
+ }
+
+ /**
* If you use a map function that does not accept TupleEntry args,
* which is the common case, an implicit conversion in GeneratedConversions
* will convert your function into a `(TupleEntry => T)`. The result type
@@ -262,10 +322,15 @@
each(fs)(new MapFunction[A,T](fn, _, conv, setter))
}
def mapTo[A,T](fs : (Fields,Fields))(fn : A => T)
- (implicit conv : TupleConverter[A], setter : TupleSetter[T]) : Pipe = {
+ (implicit conv : TupleConverter[A], setter : TupleSetter[T], tracing : Tracing) : Pipe = {
conv.assertArityMatches(fs._1)
setter.assertArityMatches(fs._2)
- eachTo(fs)(new MapFunction[A,T](fn, _, conv, setter))
+ tracing.tracingFields match {
+ case Some(fields) =>
+ each(fs)(new MapFunction[A,T](fn, _, conv, setter)).project(fs._2)
+ case None =>
+ eachTo(fs)(new MapFunction[A,T](fn, _, conv, setter))
+ }
}
def flatMap[A,T](fs : (Fields,Fields))(fn : A => Iterable[T])
(implicit conv : TupleConverter[A], setter : TupleSetter[T]) : Pipe = {
@@ -274,10 +339,15 @@
each(fs)(new FlatMapFunction[A,T](fn, _, conv, setter))
}
def flatMapTo[A,T](fs : (Fields,Fields))(fn : A => Iterable[T])
- (implicit conv : TupleConverter[A], setter : TupleSetter[T]) : Pipe = {
+ (implicit conv : TupleConverter[A], setter : TupleSetter[T], tracing : Tracing) : Pipe = {
conv.assertArityMatches(fs._1)
setter.assertArityMatches(fs._2)
- eachTo(fs)(new FlatMapFunction[A,T](fn, _, conv, setter))
+ tracing.tracingFields match {
+ case Some(fields) =>
+ each(fs)(new FlatMapFunction[A,T](fn, _, conv, setter)).project(fs._2)
+ case None =>
+ eachTo(fs)(new FlatMapFunction[A,T](fn, _, conv, setter))
+ }
}
/**
@@ -354,8 +424,33 @@
def debug = new Each(pipe, new Debug())
- def write(outsource : Source)(implicit flowDef : FlowDef, mode : Mode) = {
- outsource.write(pipe)(flowDef, mode)
+ def write(outsource : Source)(implicit flowDef : FlowDef, mode : Mode, tracing : Tracing) = {
+ Tracing.tracing = new NullTracing()
+ tracing.tracingFields match {
+ case Some(fields) => {
+ if(tracing.isTraced(pipe))
+ outsource.writeFrom(discard(fields))(flowDef, mode)
+ else
+ outsource.writeFrom(pipe)(flowDef, mode)
+ }
+ case None => outsource.writeFrom(pipe)(flowDef, mode)
+ }
+ Tracing.tracing = tracing
+ tracing.onWrite(pipe)
+ }
+ /**
+ * Adds a trap to the current pipe,
+ * which will capture all exceptions that occur in this pipe
+ * and save them to the trapsource given
+ *
+ * Traps do not include the original fields in a tuple,
+ * only the fields seen in an operation.
+ * Traps also do not include any exception information.
+ *
+ * There can only be at most one trap for each pipe.
+ **/
+ def addTrap(trapsource : Source)(implicit flowDef : FlowDef, mode : Mode) = {
+ flowDef.addTrap(pipe, trapsource.createTap(Write)(mode))
pipe
}
@@ -394,11 +489,11 @@
/**
* Same as pack but only the to fields are preserved.
*/
- def packTo[T](fs : (Fields, Fields))(implicit packer : TuplePacker[T], setter : TupleSetter[T]) : Pipe = {
+ def packTo[T](fs : (Fields, Fields))(implicit packer : TuplePacker[T], setter : TupleSetter[T], tracing : Tracing) : Pipe = {
val (fromFields, toFields) = fs
assert(toFields.size == 1, "Can only output 1 field in pack")
val conv = packer.newConverter(fromFields)
- pipe.mapTo(fs) { input : T => input } (conv, setter)
+ pipe.mapTo(fs) { input : T => input } (conv, setter, tracing)
}
/**
@@ -421,10 +516,17 @@
/**
* Same as unpack but only the to fields are preserved.
*/
- def unpackTo[T](fs : (Fields, Fields))(implicit unpacker : TupleUnpacker[T], conv : TupleConverter[T]) : Pipe = {
+ def unpackTo[T](fs : (Fields, Fields))(implicit unpacker : TupleUnpacker[T], conv : TupleConverter[T], tracing : Tracing) : Pipe = {
val (fromFields, toFields) = fs
assert(fromFields.size == 1, "Can only take 1 input field in unpack")
val setter = unpacker.newSetter(toFields)
- pipe.mapTo(fs) { input : T => input } (conv, setter)
+ pipe.mapTo(fs) { input : T => input } (conv, setter, tracing)
}
}
+
+/**
+ * A simple trait for releasable resource. Provides noop implementation.
+ */
+trait Stateful {
+ def release() {}
+}
Only in etsy-scalding/src/main/scala/com/twitter/scalding: SkewReplication.scala
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/Source.scala etsy-scalding/src/main/scala/com/twitter/scalding/Source.scala
--- twitter-scalding/src/main/scala/com/twitter/scalding/Source.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/main/scala/com/twitter/scalding/Source.scala 2013-05-29 23:08:00.000000000 -0700
@@ -88,7 +88,7 @@
sys.error("Cascading Hadoop mode not supported for: " + toString)
}
- def read(implicit flowDef : FlowDef, mode : Mode) = {
+ def read(implicit flowDef : FlowDef, mode : Mode, tracing : Tracing) = {
//insane workaround for scala compiler bug
val sources = flowDef.getSources().asInstanceOf[JMap[String,Any]]
val srcName = this.toString
@@ -102,7 +102,7 @@
* write the pipe and return the input so it can be chained into
* the next operation
*/
- def write(pipe : Pipe)(implicit flowDef : FlowDef, mode : Mode) = {
+ def writeFrom(pipe : Pipe)(implicit flowDef : FlowDef, mode : Mode) = {
//insane workaround for scala compiler bug
val sinks = flowDef.getSinks().asInstanceOf[JMap[String,Any]]
val sinkName = this.toString
@@ -186,16 +186,16 @@
// Due to type erasure, your subclass must supply this
val converter : TupleConverter[T]
def mapTo[U](out : Fields)(mf : (T) => U)
- (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U]) = {
- RichPipe(read(flowDef, mode)).mapTo[T,U](sourceFields -> out)(mf)(converter, setter)
+ (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U], tracing : Tracing) = {
+ RichPipe(read(flowDef, mode, tracing)).mapTo[T,U](sourceFields -> out)(mf)(converter, setter, tracing)
}
/**
* If you want to filter, you should use this and output a 0 or 1 length Iterable.
* Filter does not change column names, and we generally expect to change columns here
*/
def flatMapTo[U](out : Fields)(mf : (T) => Iterable[U])
- (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U]) = {
- RichPipe(read(flowDef, mode)).flatMapTo[T,U](sourceFields -> out)(mf)(converter, setter)
+ (implicit flowDef : FlowDef, mode : Mode, setter : TupleSetter[U], tracing : Tracing) = {
+ RichPipe(read(flowDef, mode, tracing)).flatMapTo[T,U](sourceFields -> out)(mf)(converter, setter, tracing)
}
}
@@ -239,7 +239,7 @@
case Read => throw new Exception("not supported, reading from null")
case Write => mode match {
case Hdfs(_, _) => new NullTap[JobConf, RecordReader[_,_], OutputCollector[_,_], Any, Any]
- case Local(_) => new NullTap[Properties, InputStream, OutputStream, Any, Any]
+ case Local(_, _) => new NullTap[Properties, InputStream, OutputStream, Any, Any]
case Test(_) => new NullTap[Properties, InputStream, OutputStream, Any, Any]
}
}
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/StreamOperations.scala etsy-scalding/src/main/scala/com/twitter/scalding/StreamOperations.scala
--- twitter-scalding/src/main/scala/com/twitter/scalding/StreamOperations.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/main/scala/com/twitter/scalding/StreamOperations.scala 2013-05-29 23:08:00.000000000 -0700
@@ -73,7 +73,7 @@
new ScanLeftIterator(s, init, fn)
}(conv,setter)
}
-
+
/**
* Only keep the first cnt elements
*/
@@ -82,10 +82,10 @@
s.take(cnt)
}(CTupleConverter, CascadingTupleSetter)
}
-
+
/**
- * Take while the predicate is true, starting at the
- * first false, output all
+ * Take while the predicate is true, stopping at the
+ * first false. Output all taken elements.
*/
def takeWhile[T](f : Fields)(fn : (T) => Boolean)(implicit conv : TupleConverter[T]) : Self = {
mapStream[TupleEntry,CTuple](f -> Fields.ARGS){ s =>
Only in etsy-scalding/src/main/scala/com/twitter/scalding: Tracing.scala
Only in etsy-scalding/src/main/scala/com/twitter/scalding: TracingFileSource.scala
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/TypedPipe.scala etsy-scalding/src/main/scala/com/twitter/scalding/TypedPipe.scala
--- twitter-scalding/src/main/scala/com/twitter/scalding/TypedPipe.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/main/scala/com/twitter/scalding/TypedPipe.scala 2013-05-29 23:08:00.000000000 -0700
@@ -8,7 +8,7 @@
import java.io.Serializable
-import com.twitter.algebird.{Monoid, Ring}
+import com.twitter.algebird.{Monoid, Ring, Aggregator}
import com.twitter.scalding.typed.{Joiner, CoGrouped2, HashCoGrouped2}
/***************
@@ -23,11 +23,6 @@
* to get automatic conversion of Mappable[T] to TypedPipe[T]
*/
object TDsl extends Serializable {
- //This can be used to avoid using groupBy:
- implicit def pipeToGrouped[K,V](tpipe : TypedPipe[(K,V)])(implicit ord : Ordering[K]) : Grouped[K,V] = {
- tpipe.group[K,V]
- }
- implicit def keyedToPipe[K,V](keyed : KeyedList[K,V]) : TypedPipe[(K,V)] = keyed.toTypedPipe
implicit def pipeTExtensions(pipe : Pipe) : PipeTExtensions = new PipeTExtensions(pipe)
implicit def mappableToTypedPipe[T](mappable : Mappable[T])
(implicit flowDef : FlowDef, mode : Mode, conv : TupleConverter[T]) : TypedPipe[T] = {
@@ -50,8 +45,8 @@
* The above sums all the tuples and returns a TypedPipe[Int] which has the total sum.
*/
def typed[T,U](fielddef : (Fields, Fields))(fn : TypedPipe[T] => TypedPipe[U])
- (implicit conv : TupleConverter[T], setter : TupleSetter[U]) : Pipe = {
- fn(TypedPipe.from(pipe, fielddef._1)(conv)).toPipe(fielddef._2)(setter)
+ (implicit conv : TupleConverter[T], setter : TupleSetter[U], tracing : Tracing) : Pipe = {
+ fn(TypedPipe.from(pipe, fielddef._1)(conv)).toPipe(fielddef._2)(setter,tracing)
}
def toTypedPipe[T](fields : Fields)(implicit conv : TupleConverter[T]) : TypedPipe[T] = {
TypedPipe.from[T](pipe, fields)(conv)
@@ -87,9 +82,13 @@
* The output pipe has a single item CTuple with an object of type T in position 0
*/
protected lazy val pipe : Pipe = {
- inpipe.flatMapTo(fields -> 0)(flatMapFn)(implicitly[TupleConverter[TupleEntry]], SingleSetter)
+ inpipe.flatMapTo(fields -> 0)(flatMapFn)(implicitly[TupleConverter[TupleEntry]], SingleSetter, Tracing.tracing)
}
+ /** Same as groupAll.aggregate.values
+ */
+ def aggregate[B,C](agg: Aggregator[T,B,C]): TypedPipe[C] = groupAll.aggregate(agg).values
+
// Implements a cross project. The right side should be tiny
def cross[U](tiny : TypedPipe[U]) : TypedPipe[(T,U)] = {
val crossedPipe = pipe.rename(0 -> 't)
@@ -136,29 +135,34 @@
TypedPipe.from(pipe ++ other.pipe, 0)(singleConverter[U])
}
- def toPipe(fieldNames : Fields)(implicit setter : TupleSetter[T]) : Pipe = {
+ /** Reasonably common shortcut for cases of associative/commutative reduction
+ * returns a typed pipe with only one element.
+ */
+ def sum(implicit plus: Monoid[T]): TypedPipe[T] = groupAll.sum.values
+
+ def toPipe(fieldNames : Fields)(implicit setter : TupleSetter[T], tracing : Tracing) : Pipe = {
val conv = implicitly[TupleConverter[TupleEntry]]
- inpipe.flatMapTo(fields -> fieldNames)(flatMapFn)(conv, setter)
+ inpipe.flatMapTo(fields -> fieldNames)(flatMapFn)(conv, setter, tracing)
}
- def unpackToPipe(fieldNames : Fields)(implicit up : TupleUnpacker[T]) : Pipe = {
+ def unpackToPipe(fieldNames : Fields)(implicit up : TupleUnpacker[T], tracing : Tracing) : Pipe = {
val setter = up.newSetter(fieldNames)
- toPipe(fieldNames)(setter)
+ toPipe(fieldNames)(setter, tracing)
}
/** A convenience method equivalent to toPipe(fieldNames).write(dest)
* @return a pipe equivalent to the current pipe.
*/
def write(fieldNames : Fields, dest : Source)
- (implicit conv : TupleConverter[T], setter : TupleSetter[T], flowDef : FlowDef, mode : Mode) : TypedPipe[T] = {
- val pipe = toPipe(fieldNames)(setter)
+ (implicit conv : TupleConverter[T], setter : TupleSetter[T], flowDef : FlowDef, mode : Mode, tracing : Tracing) : TypedPipe[T] = {
+ val pipe = toPipe(fieldNames)(setter, tracing)
pipe.write(dest)
// Now, we have written out, so let's start from here with the new pipe:
// If we don't do this, Cascading's flow planner can't see what's happening
TypedPipe.from(pipe, fieldNames)(conv)
}
def write(dest: Source)
- (implicit conv : TupleConverter[T], setter : TupleSetter[T], flowDef : FlowDef, mode : Mode) : TypedPipe[T] = {
- write(Dsl.intFields(0 until setter.arity), dest)(conv,setter,flowDef,mode)
+ (implicit conv : TupleConverter[T], setter : TupleSetter[T], flowDef : FlowDef, mode : Mode, tracing : Tracing) : TypedPipe[T] = {
+ write(Dsl.intFields(0 until setter.arity), dest)(conv,setter,flowDef,mode,tracing)
}
def keys[K](implicit ev : <:<[T,(K,_)]) : TypedPipe[K] = map { _._1 }
@@ -194,6 +198,18 @@
* Avoid accumulating the whole list in memory if you can. Prefer reduce.
*/
def mapValueStream[V](smfn : Iterator[T] => Iterator[V]) : KeyedList[K,V]
+
+ ///////////
+ /// The below are all implemented in terms of the above:
+ ///////////
+
+ /** Use Algebird Aggregator to do the reduction
+ */
+ def aggregate[B,C](agg: Aggregator[T,B,C]): TypedPipe[(K,C)] =
+ mapValues(agg.prepare _)
+ .reduce(agg.reduce _)
+ .map { kv => (kv._1, agg.present(kv._2)) }
+
/** This is a special case of mapValueStream, but can be optimized because it doesn't need
* all the values for a given key at once. An unoptimized implementation is:
* mapValueStream { _.map { fn } }
@@ -262,7 +278,7 @@
// Make a new Grouped from a pipe with two fields: 'key, 'value
def fromKVPipe[K,V](pipe : Pipe, ordering : Ordering[K])
(implicit conv : TupleConverter[V]) : Grouped[K,V] = {
- new Grouped[K,V](pipe, ordering, None, None, -1)
+ new Grouped[K,V](pipe, ordering, None, None, -1, false)
}
def valueSorting[T](implicit ord : Ordering[T]) : Fields = sorting("value", ord)
@@ -279,7 +295,8 @@
val ordering : Ordering[K],
streamMapFn : Option[(Iterator[Tuple]) => Iterator[T]],
private[scalding] val valueSort : Option[(Fields,Boolean)],
- val reducers : Int = -1)
+ val reducers : Int = -1,
+ val toReducers: Boolean = false)
extends KeyedList[K,T] with Serializable {
import Dsl._
@@ -291,15 +308,17 @@
if (fb._2) gbSorted.reverse else gbSorted
}.getOrElse(gb)
}
+ def forceToReducers: Grouped[K,T] =
+ new Grouped(pipe, ordering, streamMapFn, valueSort, reducers, true)
def withSortOrdering(so : Ordering[T]) : Grouped[K,T] = {
// Set the sorting with unreversed
assert(valueSort.isEmpty, "Can only call withSortOrdering once")
assert(streamMapFn.isEmpty, "Cannot sort after a mapValueStream")
val newValueSort = Some(Grouped.valueSorting(so)).map { f => (f,false) }
- new Grouped(pipe, ordering, None, newValueSort, reducers)
+ new Grouped(pipe, ordering, None, newValueSort, reducers, toReducers)
}
def withReducers(red : Int) : Grouped[K,T] = {
- new Grouped(pipe, ordering, streamMapFn, valueSort, red)
+ new Grouped(pipe, ordering, streamMapFn, valueSort, red, toReducers)
}
def sortBy[B](fn : (T) => B)(implicit ord : Ordering[B]) : Grouped[K,T] = {
withSortOrdering(new MappedOrdering(fn, ord))
@@ -318,12 +337,13 @@
def reverse : Grouped[K,T] = {
assert(streamMapFn.isEmpty, "Cannot reverse after mapValueStream")
val newValueSort = valueSort.map { f => (f._1, !(f._2)) }
- new Grouped(pipe, ordering, None, newValueSort, reducers)
+ new Grouped(pipe, ordering, None, newValueSort, reducers, toReducers)
}
protected def operate[T1](fn : GroupBuilder => GroupBuilder) : TypedPipe[(K,T1)] = {
val reducedPipe = pipe.groupBy(groupKey) { gb =>
- fn(sortIfNeeded(gb)).reducers(reducers)
+ val out = fn(sortIfNeeded(gb)).reducers(reducers)
+ if(toReducers) out.forceToReducers else out
}
TypedPipe.from(reducedPipe, ('key, 'value))(implicitly[TupleConverter[(K,T1)]])
}
@@ -351,7 +371,7 @@
// We have no sort defined yet, so we should operate on the pipe so we can sort by V after
// if we need to:
new Grouped(pipe.map('value -> 'value)(fn)(singleConverter[T], SingleSetter),
- ordering, None, None, reducers)
+ ordering, None, None, reducers, toReducers)
}
else {
// There is a sorting, which invalidates map-side optimizations,
@@ -378,7 +398,7 @@
}
override def mapValueStream[V](nmf : Iterator[T] => Iterator[V]) : Grouped[K,V] = {
val newStreamMapFn = Some(streamMapping.andThen(nmf))
- new Grouped[K,V](pipe, ordering, newStreamMapFn, valueSort, reducers)
+ new Grouped[K,V](pipe, ordering, newStreamMapFn, valueSort, reducers, toReducers)
}
// SMALLER PIPE ALWAYS ON THE RIGHT!!!!!!!
def cogroup[W,R](smaller: Grouped[K,W])(joiner: (K, Iterator[T], Iterable[W]) => Iterator[R])
Only in twitter-scalding/src/main/scala/com/twitter/scalding/mathematics: Matrix.scala
diff -ur twitter-scalding/src/main/scala/com/twitter/scalding/mathematics/MatrixProduct.scala etsy-scalding/src/main/scala/com/twitter/scalding/mathematics/MatrixProduct.scala
--- twitter-scalding/src/main/scala/com/twitter/scalding/mathematics/MatrixProduct.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/main/scala/com/twitter/scalding/mathematics/MatrixProduct.scala 2013-05-29 23:08:00.000000000 -0700
@@ -143,6 +143,162 @@
}
}
+ implicit def scalarRowRightProduct[Col,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[RowVector[Col,ValT],Scalar[ValT],RowVector[Col,ValT]] =
+ new MatrixProduct[RowVector[Col,ValT],Scalar[ValT],RowVector[Col,ValT]] {
+ def apply(left : RowVector[Col,ValT], right : Scalar[ValT]) : RowVector[Col,ValT]= {
+ val prod = left.toMatrix(0).nonZerosWith(right).mapValues({leftRight =>
+ val (left, right) = leftRight
+ ring.times(left, right)
+ })(ring)
+
+ new RowVector[Col,ValT](prod.colSym, prod.valSym, prod.pipe.project(prod.colSym, prod.valSym))
+ }
+ }
+
+ implicit def scalarRowLeftProduct[Col,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[Scalar[ValT],RowVector[Col,ValT],RowVector[Col,ValT]] =
+ new MatrixProduct[Scalar[ValT],RowVector[Col,ValT],RowVector[Col,ValT]] {
+ def apply(left : Scalar[ValT], right : RowVector[Col,ValT]) : RowVector[Col,ValT]= {
+ val prod = right.toMatrix(0).nonZerosWith(left).mapValues({matScal =>
+ val (matVal, scalarVal) = matScal
+ ring.times(scalarVal, matVal)
+ })(ring)
+
+ new RowVector[Col,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym))
+ }
+ }
+
+ implicit def scalarColRightProduct[Row,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[ColVector[Row,ValT],Scalar[ValT],ColVector[Row,ValT]] =
+ new MatrixProduct[ColVector[Row,ValT],Scalar[ValT],ColVector[Row,ValT]] {
+ def apply(left : ColVector[Row,ValT], right : Scalar[ValT]) : ColVector[Row,ValT]= {
+ val prod = left.toMatrix(0).nonZerosWith(right).mapValues({leftRight =>
+ val (left, right) = leftRight
+ ring.times(left, right)
+ })(ring)
+
+ new ColVector[Row,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym))
+ }
+ }
+
+ implicit def scalarColLeftProduct[Row,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[Scalar[ValT],ColVector[Row,ValT],ColVector[Row,ValT]] =
+ new MatrixProduct[Scalar[ValT],ColVector[Row,ValT],ColVector[Row,ValT]] {
+ def apply(left : Scalar[ValT], right : ColVector[Row,ValT]) : ColVector[Row,ValT]= {
+ val prod = right.toMatrix(0).nonZerosWith(left).mapValues({matScal =>
+ val (matVal, scalarVal) = matScal
+ ring.times(scalarVal, matVal)
+ })(ring)
+
+ new ColVector[Row,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym))
+ }
+ }
+
+ implicit def litScalarRowRightProduct[Col,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[RowVector[Col,ValT],LiteralScalar[ValT],RowVector[Col,ValT]] =
+ new MatrixProduct[RowVector[Col,ValT],LiteralScalar[ValT],RowVector[Col,ValT]] {
+ def apply(left : RowVector[Col,ValT], right : LiteralScalar[ValT]) : RowVector[Col,ValT]= {
+ val prod = left.toMatrix(0).nonZerosWith(right).mapValues({leftRight =>
+ val (left, right) = leftRight
+ ring.times(left, right)
+ })(ring)
+
+ new RowVector[Col,ValT](prod.colSym, prod.valSym, prod.pipe.project(prod.colSym, prod.valSym))
+ }
+ }
+
+ implicit def litScalarRowLeftProduct[Col,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[LiteralScalar[ValT],RowVector[Col,ValT],RowVector[Col,ValT]] =
+ new MatrixProduct[LiteralScalar[ValT],RowVector[Col,ValT],RowVector[Col,ValT]] {
+ def apply(left : LiteralScalar[ValT], right : RowVector[Col,ValT]) : RowVector[Col,ValT]= {
+ val prod = right.toMatrix(0).nonZerosWith(left).mapValues({matScal =>
+ val (matVal, scalarVal) = matScal
+ ring.times(scalarVal, matVal)
+ })(ring)
+
+ new RowVector[Col,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym))
+ }
+ }
+
+ implicit def litScalarColRightProduct[Row,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[ColVector[Row,ValT],LiteralScalar[ValT],ColVector[Row,ValT]] =
+ new MatrixProduct[ColVector[Row,ValT],LiteralScalar[ValT],ColVector[Row,ValT]] {
+ def apply(left : ColVector[Row,ValT], right : LiteralScalar[ValT]) : ColVector[Row,ValT]= {
+ val prod = left.toMatrix(0).nonZerosWith(right).mapValues({leftRight =>
+ val (left, right) = leftRight
+ ring.times(left, right)
+ })(ring)
+
+ new ColVector[Row,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym))
+ }
+ }
+
+ implicit def litScalarColLeftProduct[Row,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[LiteralScalar[ValT],ColVector[Row,ValT],ColVector[Row,ValT]] =
+ new MatrixProduct[LiteralScalar[ValT],ColVector[Row,ValT],ColVector[Row,ValT]] {
+ def apply(left : LiteralScalar[ValT], right : ColVector[Row,ValT]) : ColVector[Row,ValT]= {
+ val prod = right.toMatrix(0).nonZerosWith(left).mapValues({matScal =>
+ val (matVal, scalarVal) = matScal
+ ring.times(scalarVal, matVal)
+ })(ring)
+
+ new ColVector[Row,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym))
+ }
+ }
+
+ implicit def scalarDiagRightProduct[Row,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[DiagonalMatrix[Row,ValT],Scalar[ValT], DiagonalMatrix[Row,ValT]] =
+ new MatrixProduct[DiagonalMatrix[Row,ValT],Scalar[ValT],DiagonalMatrix[Row,ValT]] {
+ def apply(left : DiagonalMatrix[Row,ValT], right : Scalar[ValT]) : DiagonalMatrix[Row,ValT]= {
+ val prod = left.toRow.toMatrix(0).nonZerosWith(right).mapValues({leftRight =>
+ val (left, right) = leftRight
+ ring.times(left, right)
+ })(ring)
+
+ new DiagonalMatrix[Row,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym))
+ }
+ }
+
+ implicit def scalarDiagLeftProduct[Row,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[Scalar[ValT],DiagonalMatrix[Row,ValT],DiagonalMatrix[Row,ValT]] =
+ new MatrixProduct[Scalar[ValT],DiagonalMatrix[Row,ValT],DiagonalMatrix[Row,ValT]] {
+ def apply(left : Scalar[ValT], right : DiagonalMatrix[Row,ValT]) : DiagonalMatrix[Row,ValT]= {
+ val prod = right.toRow.toMatrix(0).nonZerosWith(left).mapValues({matScal =>
+ val (matVal, scalarVal) = matScal
+ ring.times(scalarVal, matVal)
+ })(ring)
+
+ new DiagonalMatrix[Row,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym))
+ }
+ }
+
+ implicit def litScalarDiagRightProduct[Col,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[DiagonalMatrix[Col,ValT],LiteralScalar[ValT],DiagonalMatrix[Col,ValT]] =
+ new MatrixProduct[DiagonalMatrix[Col,ValT],LiteralScalar[ValT],DiagonalMatrix[Col,ValT]] {
+ def apply(left : DiagonalMatrix[Col,ValT], right : LiteralScalar[ValT]) : DiagonalMatrix[Col,ValT]= {
+ val prod = left.toRow.toMatrix(0).nonZerosWith(right).mapValues({leftRight =>
+ val (left, right) = leftRight
+ ring.times(left, right)
+ })(ring)
+
+ new DiagonalMatrix[Col,ValT](prod.colSym, prod.valSym, prod.pipe.project(prod.colSym, prod.valSym))
+ }
+ }
+
+ implicit def litScalarDiagLeftProduct[Col,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[LiteralScalar[ValT],DiagonalMatrix[Col,ValT],DiagonalMatrix[Col,ValT]] =
+ new MatrixProduct[LiteralScalar[ValT],DiagonalMatrix[Col,ValT],DiagonalMatrix[Col,ValT]] {
+ def apply(left : LiteralScalar[ValT], right : DiagonalMatrix[Col,ValT]) : DiagonalMatrix[Col,ValT]= {
+ val prod = right.toRow.toMatrix(0).nonZerosWith(left).mapValues({matScal =>
+ val (matVal, scalarVal) = matScal
+ ring.times(scalarVal, matVal)
+ })(ring)
+
+ new DiagonalMatrix[Col,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym))
+ }
+ }
+
implicit def rowColProduct[IdxT,ValT](implicit ring : Ring[ValT]) :
MatrixProduct[RowVector[IdxT,ValT],ColVector[IdxT,ValT],Scalar[ValT]] =
new MatrixProduct[RowVector[IdxT,ValT],ColVector[IdxT,ValT],Scalar[ValT]] {
@@ -153,6 +309,36 @@
}
}
+ implicit def matrixColVecProduct[RowT,CommonT,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[Matrix[RowT,CommonT,ValT],ColVector[CommonT,ValT],ColVector[RowT,ValT]] =
+ new MatrixProduct[Matrix[RowT,CommonT,ValT],ColVector[CommonT,ValT],ColVector[RowT,ValT]] {
+ def apply(left : Matrix[RowT,CommonT,ValT], right : ColVector[CommonT,ValT]) : ColVector[RowT,ValT] = {
+ val prod = (left * right.toMatrix(0)) : Matrix[RowT,Int,ValT]
+ new ColVector[RowT,ValT](prod.rowSym, prod.valSym, prod.pipe.project(prod.rowSym, prod.valSym))
+ }
+ }
+
+ implicit def rowVecMatrixProduct[CommonT,ColT,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[RowVector[CommonT,ValT],Matrix[CommonT,ColT,ValT],RowVector[ColT,ValT]] =
+ new MatrixProduct[RowVector[CommonT,ValT],Matrix[CommonT,ColT,ValT],RowVector[ColT,ValT]] {
+ def apply(left : RowVector[CommonT,ValT], right : Matrix[CommonT,ColT,ValT]) : RowVector[ColT,ValT] = {
+ val prod = (left.toMatrix(0) * right) : Matrix[Int,ColT,ValT]
+ new RowVector[ColT,ValT](prod.colSym, prod.valSym, prod.pipe.project(prod.colSym, prod.valSym))
+ }
+ }
+
+ implicit def colRowOuterProduct[IdxT,ValT](implicit ring : Ring[ValT]) :
+ MatrixProduct[ColVector[IdxT,ValT],RowVector[IdxT,ValT],Matrix[IdxT,IdxT,ValT]] =
+ new MatrixProduct[ColVector[IdxT,ValT],RowVector[IdxT,ValT],Matrix[IdxT,IdxT,ValT]] {
+ def apply(left : ColVector[IdxT,ValT], right : RowVector[IdxT,ValT]) : Matrix[IdxT,IdxT,ValT] = {
+ val prod = (left.toMatrix(0) * right.toMatrix(0)) : Matrix[IdxT,IdxT,ValT]
+ new Matrix[IdxT,IdxT,ValT](prod.rowSym,
+ prod.colSym,
+ prod.valSym,
+ prod.pipe.project(prod.rowSym, prod.colSym, prod.valSym))
+ }
+ }
+
implicit def standardMatrixProduct[RowL,Common,ColR,ValT](implicit ring : Ring[ValT]) :
MatrixProduct[Matrix[RowL,Common,ValT],Matrix[Common,ColR,ValT],Matrix[RowL,ColR,ValT]] =
new MatrixProduct[Matrix[RowL,Common,ValT],Matrix[Common,ColR,ValT],Matrix[RowL,ColR,ValT]] {
diff -ur twitter-scalding/src/test/scala/com/twitter/scalding/ArgTest.scala etsy-scalding/src/test/scala/com/twitter/scalding/ArgTest.scala
--- twitter-scalding/src/test/scala/com/twitter/scalding/ArgTest.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/test/scala/com/twitter/scalding/ArgTest.scala 2013-05-29 23:08:00.000000000 -0700
@@ -77,5 +77,13 @@
a.list("b") must_== List("1", "-3", "4")
a("c").toInt must_== -5
}
+ "handle k, v pairs separated by an equal sign" in {
+ val a = Args("a=1")
+ a("a") must be_==("1")
+ }
+ "handle multiple arguments when k, v pairs separated by an equal sign" in {
+ val a = Args("a=1 2 3")
+ a.list("a") must_== List("1", "2", "3")
+ }
}
}
diff -ur twitter-scalding/src/test/scala/com/twitter/scalding/CoreTest.scala etsy-scalding/src/test/scala/com/twitter/scalding/CoreTest.scala
--- twitter-scalding/src/test/scala/com/twitter/scalding/CoreTest.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/test/scala/com/twitter/scalding/CoreTest.scala 2013-05-29 23:08:00.000000000 -0700
@@ -35,6 +35,37 @@
}
}
+object GroupRandomlyJob {
+ val NumShards = 10
+}
+
+class GroupRandomlyJob(args: Args) extends Job(args) {
+ import GroupRandomlyJob.NumShards
+
+ Tsv("fakeInput")
+ .read
+ .mapTo(0 -> 'num) { (line: String) => line.toInt }
+ .groupRandomly(NumShards) { _.max('num) }
+ .groupAll { _.size }
+ .write(Tsv("fakeOutput"))
+}
+
+class GroupRandomlyJobTest extends Specification with TupleConversions {
+ import GroupRandomlyJob.NumShards
+ noDetailedDiffs()
+
+ "A GroupRandomlyJob" should {
+ val input = (0 to 10000).map { _.toString }.map { Tuple1(_) }
+ JobTest("com.twitter.scalding.GroupRandomlyJob")
+ .source(Tsv("fakeInput"), input)
+ .sink[(Int)](Tsv("fakeOutput")) { outBuf =>
+ val numShards = outBuf(0)
+ numShards must be_==(NumShards)
+ }
+ .run.finish
+ }
+}
+
class MapToGroupBySizeSumMaxJob(args: Args) extends Job(args) {
TextLine(args("input")).read.
//1 is the line
@@ -1151,3 +1182,36 @@
}
}
+class ThrowsErrorsJob(args : Args) extends Job(args) {
+ Tsv("input",('letter, 'x))
+ .read
+ .addTrap(Tsv("trapped"))
+ .map(('letter, 'x) -> 'yPrime){ fields : (String, Int) =>
+ if (fields._2 == 1) throw new Exception("Erroneous Ones") else fields._2 }
+ .write(Tsv("output"))
+}
+
+
+class AddTrapTest extends Specification {
+ import Dsl._
+
+ noDetailedDiffs() //Fixes an issue with scala 2.9
+ "An AddTrap" should {
+ val input = List(("a", 1),("b", 2), ("c", 3), ("d", 1), ("e", 2))
+
+ JobTest(new ThrowsErrorsJob(_))
+ .source(Tsv("input",('letter,'x)), input)
+ .sink[(String, Int)](Tsv("output")) { outBuf =>
+ "must contain all numbers in input except for 1" in {
+ outBuf.toList.sorted must be_==(List(("b", 2), ("c", 3), ("e", 2)))
+ }
+ }
+ .sink[(String, Int)](Tsv("trapped")) { outBuf =>
+ "must contain all 1s and fields in input" in {
+ outBuf.toList.sorted must be_==(List(("a", 1), ("d", 1)))
+ }
+ }
+ .run
+ .finish
+ }
+}
Only in etsy-scalding/src/test/scala/com/twitter/scalding: InputTracingTest.scala
diff -ur twitter-scalding/src/test/scala/com/twitter/scalding/KryoTest.scala etsy-scalding/src/test/scala/com/twitter/scalding/KryoTest.scala
--- twitter-scalding/src/test/scala/com/twitter/scalding/KryoTest.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/test/scala/com/twitter/scalding/KryoTest.scala 2013-05-29 23:08:00.000000000 -0700
@@ -10,7 +10,7 @@
import scala.collection.immutable.ListMap
import scala.collection.immutable.HashMap
-import com.twitter.algebird.{AveragedValue, DecayedValue, HLLInstance,
+import com.twitter.algebird.{AveragedValue, DecayedValue,
HyperLogLog, HyperLogLogMonoid, Moments, Monoid}
/*
Only in etsy-scalding/src/test/scala/com/twitter/scalding: SkewJoinTest.scala
diff -ur twitter-scalding/src/test/scala/com/twitter/scalding/TypedPipeTest.scala etsy-scalding/src/test/scala/com/twitter/scalding/TypedPipeTest.scala
--- twitter-scalding/src/test/scala/com/twitter/scalding/TypedPipeTest.scala 2013-05-29 23:08:00.000000000 -0700
+++ etsy-scalding/src/test/scala/com/twitter/scalding/TypedPipeTest.scala 2013-05-29 23:08:00.000000000 -0700
@@ -17,6 +17,7 @@
.map { w => (w, 1L) }
.forceToDisk
.group
+ .forceToReducers
.sum
.write(Tsv("outputFile"))
}
@@ -43,9 +44,9 @@
}
class TypedPipeJoinJob(args : Args) extends Job(args) {
- (Tsv("inputFile0").read.toTypedPipe[(Int,Int)](0, 1)
- leftJoin TypedPipe.from[(Int,Int)](Tsv("inputFile1").read, (0, 1)))
- .toPipe('key, 'value)
+ (Tsv("inputFile0").read.toTypedPipe[(Int,Int)](0, 1).group
+ leftJoin TypedPipe.from[(Int,Int)](Tsv("inputFile1").read, (0, 1)).group)
+ .toTypedPipe
.write(Tsv("outputFile"))
}
@@ -53,7 +54,7 @@
noDetailedDiffs() //Fixes an issue with scala 2.9
import Dsl._
"A TypedPipeJoin" should {
- JobTest("com.twitter.scalding.TypedPipeJoinJob")
+ JobTest(new com.twitter.scalding.TypedPipeJoinJob(_))
.source(Tsv("inputFile0"), List((0,0), (1,1), (2,2), (3,3), (4,5)))
.source(Tsv("inputFile1"), List((0,1), (1,2), (2,3), (3,4)))
.sink[(Int,(Int,Option[Int]))](Tsv("outputFile")){ outputBuffer =>
@@ -73,19 +74,19 @@
}
class TypedPipeHashJoinJob(args : Args) extends Job(args) {
- (Tsv("inputFile0").read.toTypedPipe[(Int,Int)](0, 1)
- hashLeftJoin TypedPipe.from[(Int,Int)](Tsv("inputFile1").read, (0, 1)))
- .toPipe('key, 'value)
+ TypedTsv[(Int,Int)]("inputFile0")
+ .group
+ .hashLeftJoin(TypedTsv[(Int,Int)]("inputFile1").group)
.write(Tsv("outputFile"))
}
class TypedPipeHashJoinTest extends Specification {
noDetailedDiffs() //Fixes an issue with scala 2.9
import Dsl._
- "A TypedPipeJoin" should {
- JobTest("com.twitter.scalding.TypedPipeJoinJob")
- .source(Tsv("inputFile0"), List((0,0), (1,1), (2,2), (3,3), (4,5)))
- .source(Tsv("inputFile1"), List((0,1), (1,2), (2,3), (3,4)))
+ "A TypedPipeHashJoinJob" should {
+ JobTest(new com.twitter.scalding.TypedPipeHashJoinJob(_))
+ .source(TypedTsv[(Int,Int)]("inputFile0"), List((0,0), (1,1), (2,2), (3,3), (4,5)))
+ .source(TypedTsv[(Int,Int)]("inputFile1"), List((0,1), (1,2), (2,3), (3,4)))
.sink[(Int,(Int,Option[Int]))](Tsv("outputFile")){ outputBuffer =>
val outMap = outputBuffer.toMap
"correctly join" in {
@@ -107,13 +108,15 @@
TextLine("inputFile").read.typed(1 -> ('maxWord, 'maxCnt)) { tpipe : TypedPipe[String] =>
tpipe.flatMap { _.split("\\s+") }
.map { w => (w, 1L) }
- // groupby the key and sum the values:
+ .group
.sum
.groupAll
+ // Looks like swap, but on the values in the grouping:
.mapValues { revTup _ }
+ .forceToReducers
.max
// Throw out the Unit key and reverse the value tuple
- .map { _._2 }
+ .values
.swap
}.write(Tsv("outputFile"))
}
@@ -137,27 +140,28 @@
}
class TJoinCountJob(args : Args) extends Job(args) {
- (TypedPipe.from[(Int,Int)](Tsv("in0",(0,1)), (0,1))
- join TypedPipe.from[(Int,Int)](Tsv("in1", (0,1)), (0,1)))
+ (TypedPipe.from[(Int,Int)](Tsv("in0",(0,1)), (0,1)).group
+ join TypedPipe.from[(Int,Int)](Tsv("in1", (0,1)), (0,1)).group)
.size
- .toPipe('key, 'count)
.write(Tsv("out"))
//Also check simple joins:
- (TypedPipe.from[(Int,Int)](Tsv("in0",(0,1)), (0,1))
- join TypedPipe.from[(Int,Int)](Tsv("in1", (0,1)), (0,1)))
+ (TypedPipe.from[(Int,Int)](Tsv("in0",(0,1)), (0,1)).group
+ join TypedPipe.from[(Int,Int)](Tsv("in1", (0,1)), (0,1)).group)
//Flatten out to three values:
+ .toTypedPipe
.map { kvw => (kvw._1, kvw._2._1, kvw._2._2) }
- .write(('x, 'y, 'z), Tsv("out2"))
+ .write(Tsv("out2"))
//Also check simple leftJoins:
- (TypedPipe.from[(Int,Int)](Tsv("in0",(0,1)), (0,1))
- leftJoin TypedPipe.from[(Int,Int)](Tsv("in1", (0,1)), (0,1)))
+ (TypedPipe.from[(Int,Int)](Tsv("in0",(0,1)), (0,1)).group
+ leftJoin TypedPipe.from[(Int,Int)](Tsv("in1", (0,1)), (0,1)).group)
//Flatten out to three values:
+ .toTypedPipe
.map { kvw : (Int,(Int,Option[Int])) =>
(kvw._1, kvw._2._1, kvw._2._2.getOrElse(-1))
}
- .write(('x, 'y, 'z), Tsv("out3"))
+ .write(Tsv("out3"))
}
class TypedPipeJoinCountTest extends Specification {
@@ -263,6 +267,7 @@
pipe.flatMap { _.split("\\s+").map(_.toLowerCase) }
.groupBy(identity)
.mapValueStream(input => Iterator(input.size))
+ .forceToReducers
}
val first = countWordsIn(TypedPipe.from(TextLine("in0")))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment