Skip to content

Instantly share code, notes, and snippets.

@amimimor
Created May 24, 2012 13:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save amimimor/2781595 to your computer and use it in GitHub Desktop.
Save amimimor/2781595 to your computer and use it in GitHub Desktop.
Scalding DelimitedScheme2 - with optional skipHeader and writeHeader TextDelimited
import cascading.tuple.Fields
import cascading.scheme.Scheme
import cascading.scheme.local.{TextLine => CLTextLine, TextDelimited => CLTextDelimited}
import cascading.scheme.hadoop.{TextLine => CHTextLine, TextDelimited => CHTextDelimited, SequenceFile => CHSequenceFile}
import org.apache.hadoop.mapred.{JobConf, RecordReader, OutputCollector }
import com.twitter.scalding.{Args, FixedPathSource, Job, Source}
trait DelimitedScheme2 extends Source {
//override these as needed:
val fields = Fields.ALL
//This is passed directly to cascading where null is interpretted as string
val types : Array[Class[_]] = null
val separator = "\t"
val skipHeader = false
val writeHeader = false
override def localScheme = new CLTextDelimited(fields, skipHeader, writeHeader, separator, types)
override def hdfsScheme = {
new CHTextDelimited(fields, skipHeader, writeHeader, separator, types).asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
}
}
case class Tsv(p : String, f : Fields = Fields.ALL, sh : Boolean = false, wh: Boolean = false) extends FixedPathSource(p)
with DelimitedScheme2 {
override val fields = f
override val skipHeader = sh
override val writeHeader = wh
}
class RWTsv(args: Args) extends Job(args) {
Tsv(args("input"), ('first, 'last), true, false).flatMap(('first, 'last) -> 'word) {
(firstLast: (String, String)) => {
Array(firstLast._1, firstLast._2)
}
}
.groupBy('word) { _.size }
.write(Tsv(args("output"), ('word, 'size), true, true))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment