Last active
August 29, 2015 13:55
-
-
Save rubanm/8746374 to your computer and use it in GitHub Desktop.
Scalding - MultipleTsvFiles with per-file discriminators
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// One of my Scalding jobs runs into this cascading issue | |
// when there are a large number of input files/steps involved: | |
// https://groups.google.com/forum/#!searchin/cascading-user/hadoopplanner/cascading-user/R0FMbAOCgt0/BraEMBJ0Xi0J | |
// A workaround for this is to reduce the total number of steps | |
// by read multiple files together in a single flow. | |
// This can be done using Scalding's MultipleTsvFiles source. | |
// However, I need to insert some per-file discriminator fields | |
// to uniquely identify which file each tuple comes from. | |
// The trick is to use a separate "inner" scheme for the taps | |
// and an "outer" scheme that contains the additional disriminator field. | |
// Then it just boils down to having your tuple iterator append discriminator | |
// fields per file. | |
import com.twitter.scalding._ | |
import cascading.flow.FlowProcess | |
import cascading.scheme.hadoop.{ TextDelimited => CHTextDelimited } | |
import cascading.scheme.Scheme | |
import cascading.tap.hadoop.Hfs | |
import cascading.tap.Tap | |
import cascading.tuple.{ | |
Fields, | |
Tuple, | |
TupleEntry, | |
TupleEntryChainIterator, | |
TupleEntryIterator, | |
TupleEntrySchemeCollector | |
} | |
import org.apache.hadoop.mapred.JobConf | |
import org.apache.hadoop.mapred.OutputCollector | |
import org.apache.hadoop.mapred.{ RecordReader, RecordWriter } | |
import java.util.{ Iterator => JIterator } | |
class MyScaldingMultiSourceTap(taps : Seq[Tap[JobConf, RecordReader[_,_], OutputCollector[_,_]]], d: Seq[Tuple], | |
scheme: Scheme[JobConf,RecordReader[_, _], java.lang.Void, _, _]) | |
extends ScaldingMultiSourceTap(taps) { | |
assert(taps.size == d.size) | |
setScheme(scheme) | |
private class MyTupleIterator(iterator: TupleEntryIterator, dValues: Tuple) extends JIterator[Tuple] { | |
override def hasNext = iterator.hasNext | |
override def next: Tuple = iterator.next.getTuple.append(dValues) | |
override def remove = iterator.remove | |
} | |
override def openForRead(flowProcess: FlowProcess[JobConf], input: RecordReader[_, _]): TupleEntryIterator = { | |
val iterators = getTaps.zipWithIndex.map { case (tap, i) => | |
new MyTupleIterator(tap.openForRead(flowProcess), d(i)) | |
} | |
new TupleEntryChainIterator(getSourceFields, iterators:_*) | |
} | |
} | |
case class MyMultipleTsvFiles(p : Seq[String], | |
override val fields : Fields = Fields.ALL, | |
d: Seq[Tuple], | |
dFields : Fields, | |
override val skipHeader : Boolean = false, override val writeHeader: Boolean = false) | |
extends FixedPathSource(p:_*) | |
with DelimitedScheme { | |
// TODO: add support for local mode | |
assert(p.size == d.size) | |
def hdfsInnerScheme = { | |
HadoopSchemeInstance(new CHTextDelimited(fields.subtract(dFields), null, skipHeader, writeHeader, separator, strict, quote, types, safe)) | |
} | |
protected override def createHdfsReadTap(hdfsMode : Hdfs) : Tap[JobConf, _, _] = { | |
val taps : List[Tap[JobConf, RecordReader[_,_], OutputCollector[_,_]]] = | |
goodHdfsPaths(hdfsMode) | |
.toList.map { path => CastHfsTap(new Hfs(hdfsInnerScheme, path, sinkMode)) } | |
taps.size match { | |
case 0 => { | |
// This case is going to result in an error, but we don't want to throw until | |
// validateTaps, so we just put a dummy path to return something so the | |
// Job constructor does not fail. | |
// Same behavior as parent class. | |
CastHfsTap(new Hfs(hdfsScheme, hdfsPaths.head, sinkMode)) | |
} | |
case _ => new MyScaldingMultiSourceTap(taps, d, | |
hdfsScheme.asInstanceOf[Scheme[JobConf,RecordReader[_, _], java.lang.Void, _, _]]) | |
} | |
} | |
} | |
class MultipleFilesTestJob(args: Args) extends Job(args) { | |
val filesList = args.list("files") | |
val d = Stream.continually("file_num_").take(filesList.size).toList | |
.zipWithIndex.map { case (str, i) => new Tuple(str+i) } | |
MyMultipleTsvFiles(filesList, ('hashed_key, 'file_source), d, ('file_source)) | |
.project('hashed_key, 'd) | |
.write(Tsv(args("output"))) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment