Skip to content

Instantly share code, notes, and snippets.

@rubanm
Last active August 29, 2015 13:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rubanm/8746374 to your computer and use it in GitHub Desktop.
Save rubanm/8746374 to your computer and use it in GitHub Desktop.
Scalding - MultipleTsvFiles with per-file discriminators
// One of my Scalding jobs runs into this cascading issue
// when there are a large number of input files/steps involved:
// https://groups.google.com/forum/#!searchin/cascading-user/hadoopplanner/cascading-user/R0FMbAOCgt0/BraEMBJ0Xi0J
// A workaround for this is to reduce the total number of steps
// by read multiple files together in a single flow.
// This can be done using Scalding's MultipleTsvFiles source.
// However, I need to insert some per-file discriminator fields
// to uniquely identify which file each tuple comes from.
// The trick is to use a separate "inner" scheme for the taps
// and an "outer" scheme that contains the additional disriminator field.
// Then it just boils down to having your tuple iterator append discriminator
// fields per file.
import com.twitter.scalding._
import cascading.flow.FlowProcess
import cascading.scheme.hadoop.{ TextDelimited => CHTextDelimited }
import cascading.scheme.Scheme
import cascading.tap.hadoop.Hfs
import cascading.tap.Tap
import cascading.tuple.{
Fields,
Tuple,
TupleEntry,
TupleEntryChainIterator,
TupleEntryIterator,
TupleEntrySchemeCollector
}
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.{ RecordReader, RecordWriter }
import java.util.{ Iterator => JIterator }
class MyScaldingMultiSourceTap(taps : Seq[Tap[JobConf, RecordReader[_,_], OutputCollector[_,_]]], d: Seq[Tuple],
scheme: Scheme[JobConf,RecordReader[_, _], java.lang.Void, _, _])
extends ScaldingMultiSourceTap(taps) {
assert(taps.size == d.size)
setScheme(scheme)
private class MyTupleIterator(iterator: TupleEntryIterator, dValues: Tuple) extends JIterator[Tuple] {
override def hasNext = iterator.hasNext
override def next: Tuple = iterator.next.getTuple.append(dValues)
override def remove = iterator.remove
}
override def openForRead(flowProcess: FlowProcess[JobConf], input: RecordReader[_, _]): TupleEntryIterator = {
val iterators = getTaps.zipWithIndex.map { case (tap, i) =>
new MyTupleIterator(tap.openForRead(flowProcess), d(i))
}
new TupleEntryChainIterator(getSourceFields, iterators:_*)
}
}
case class MyMultipleTsvFiles(p : Seq[String],
override val fields : Fields = Fields.ALL,
d: Seq[Tuple],
dFields : Fields,
override val skipHeader : Boolean = false, override val writeHeader: Boolean = false)
extends FixedPathSource(p:_*)
with DelimitedScheme {
// TODO: add support for local mode
assert(p.size == d.size)
def hdfsInnerScheme = {
HadoopSchemeInstance(new CHTextDelimited(fields.subtract(dFields), null, skipHeader, writeHeader, separator, strict, quote, types, safe))
}
protected override def createHdfsReadTap(hdfsMode : Hdfs) : Tap[JobConf, _, _] = {
val taps : List[Tap[JobConf, RecordReader[_,_], OutputCollector[_,_]]] =
goodHdfsPaths(hdfsMode)
.toList.map { path => CastHfsTap(new Hfs(hdfsInnerScheme, path, sinkMode)) }
taps.size match {
case 0 => {
// This case is going to result in an error, but we don't want to throw until
// validateTaps, so we just put a dummy path to return something so the
// Job constructor does not fail.
// Same behavior as parent class.
CastHfsTap(new Hfs(hdfsScheme, hdfsPaths.head, sinkMode))
}
case _ => new MyScaldingMultiSourceTap(taps, d,
hdfsScheme.asInstanceOf[Scheme[JobConf,RecordReader[_, _], java.lang.Void, _, _]])
}
}
}
class MultipleFilesTestJob(args: Args) extends Job(args) {
val filesList = args.list("files")
val d = Stream.continually("file_num_").take(filesList.size).toList
.zipWithIndex.map { case (str, i) => new Tuple(str+i) }
MyMultipleTsvFiles(filesList, ('hashed_key, 'file_source), d, ('file_source))
.project('hashed_key, 'd)
.write(Tsv(args("output")))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment