Skip to content

Instantly share code, notes, and snippets.

@alaiacano
Created March 31, 2014 18:01
Show Gist options
  • Save alaiacano/9898289 to your computer and use it in GitHub Desktop.
Save alaiacano/9898289 to your computer and use it in GitHub Desktop.
import com.twitter.scalding._
import scala.collection.mutable.ListBuffer
import TDsl._
/*
INPUT:
a 1
a 2
a 3
b 1
b 2
b 3
c 1
c 2
c 3
OUTPUT:
a 0
a 1
a 2
a 3
b 0
b 4 // size didn't reset to 0 for this group?
b 5
b 6
c 0
c 7 // same here
c 8
c 9
*/
class ScanTest(args: Args) extends Job(args) {
/**
* Creates a ListBuffer and adds stuff to it in a scanLeft, returning the size of the buffer at the end.
*
* The expected output is the groupID and the size of the buffer growing from 0 to 3. However, the
* size of the buffer seems to pick up where the previous group left off.
*/
def scanThing(data: Grouped[String, (Double)]) = {
// create a new empty buffer. also tried ListBuffer[Double]()
lazy val buffer = ListBuffer.empty[Double]
// scan through `data`, which should just be the group associated with a single letter
data.scanLeft[Int](0) {
case (old, newval) => {
buffer.prepend(newval)
buffer.size
}
}
}
val dataSectioned : Grouped[String, (Double)] = TypedTsv[(String,Double)]("scantest.tsv")
.group
scanThing(dataSectioned)
.toTypedPipe
.write(TypedTsv[(String, Int)]("scantest-out.tsv"))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment