Created
August 12, 2016 18:11
-
-
Save piyushnarang/a6a8878fff30ea7438997d9f01d4c2e2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.{Iterator => JIterator, UUID} | |
import cascading.flow.FlowProcess | |
import cascading.flow.hadoop.HadoopFlowProcess | |
import cascading.scheme.{Scheme, SinkCall, SourceCall} | |
import cascading.tap._ | |
import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator | |
import cascading.tuple._ | |
import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper | |
import com.twitter.elephantbird.mapreduce.input.MapReduceInputFormatWrapper | |
import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat | |
import org.apache.hadoop.mapred.{JobConf, OutputCollector, RecordReader} | |
import scala.collection.JavaConverters._ | |
object MultiFormatSourceTap { | |
type CascadingHadoopTap = Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]] | |
val TapIndexKeyPrefix = "scalding_internal.multiformat.tapindex" | |
def apply(inputTaps: Iterable[CascadingHadoopTap]): MultiFormatSourceTap = { | |
require(inputTaps.nonEmpty, "MultiFormatSourceTap with no underlying taps") | |
new MultiFormatSourceTap { | |
override protected lazy val taps: Iterable[CascadingHadoopTap] = inputTaps | |
} | |
} | |
} | |
/** | |
* Same as Cascading's MultiSourceTap, but with no restriction that all | |
* underlying schemes and taps be of the same class. | |
*/ | |
abstract class MultiFormatSourceTap | |
extends SourceTap[JobConf, RecordReader[_, _]] | |
with CompositeTap[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] { | |
import MultiFormatSourceTap._ | |
protected def taps: Iterable[CascadingHadoopTap] | |
// keep a copy of the underlying tap configs | |
// to be used for hash joins and toIterator cases. | |
// JobConf is not serializable so we use a Map instead | |
private[this] var tapConfigs: Option[List[Map[String, String]]] = None | |
class TupleIterator(it: TupleEntryIterator) extends JIterator[Tuple] { | |
override def hasNext: Boolean = it.hasNext | |
override def next: Tuple = it.next.getTuple | |
override def remove: Unit = it.remove | |
} | |
private final val randomId = UUID.randomUUID.toString | |
// use a key specific to this tap | |
private final val tapIndexKey = s"$TapIndexKeyPrefix.$randomId" | |
override def getIdentifier: String = randomId | |
override protected def getChildTaps: JIterator[CascadingHadoopTap] = | |
taps.iterator.asJava | |
override def isReplace: Boolean = false // cannot be used as sink | |
override def getNumChildTaps: Long = taps.size.toLong | |
override def getModifiedTime(conf: JobConf): Long = taps.map(_.getModifiedTime(conf)).max | |
override def resourceExists(conf: JobConf): Boolean = taps.forall(_.resourceExists(conf)) | |
override def openForRead(flow: FlowProcess[JobConf], input: RecordReader[_, _]): TupleEntryIterator = | |
if (input != null) { | |
new HadoopTupleEntrySchemeIterator(flow, this, input) | |
} else { | |
// this is the code path for toIterator and hashjoin | |
tapConfigs match { | |
case None => | |
sys.error("tapConfigs is not defined. This should have been defined in Tap#sourceConfInit method.") | |
case Some(tc) => | |
val tapIterators = taps.zipWithIndex.map { case (t, i) => | |
val tapConfig = tc(i) | |
val tapJobConf = new JobConf(false) | |
tapConfig.foreach { case (k, v) => tapJobConf.set(k, v) } | |
val tapFlow = new HadoopFlowProcess(tapJobConf) | |
new TupleIterator(t.openForRead(tapFlow)) | |
} | |
new TupleEntryChainIterator(getSourceFields, tapIterators.toSeq: _*) | |
} | |
} | |
// generate separate job conf per tap | |
// and use cascading's MultiInputFormat to group them | |
override def sourceConfInit(flow: FlowProcess[JobConf], conf: JobConf): Unit = { | |
val tapJobConfs: List[JobConf] = taps.zipWithIndex.map { case (tap, i) => | |
// verify that input paths are set for this tap | |
if (tap.getIdentifier == null) | |
sys.error(s"Tap may not have null identifier / input path: $tap") | |
val tapConf = flow.copyConfig(conf) | |
// this uniquely identifies the tap, scheme pair during sourcePrepare | |
tapConf.set(tapIndexKey, i.toString) | |
// sourceConfInit on the tap here sets the input paths in tapConf | |
// and also calls sourceConfInit on the underlying scheme | |
tap.sourceConfInit(flow, tapConf) | |
val tapInputFormat = tapConf.getInputFormat | |
// make sure we don't double wrap an ElephantBird format | |
if (tapConf.getBoolean("multiformat.source.combine", false) && | |
!tapInputFormat.isInstanceOf[DeprecatedInputFormatWrapper[_, _]]) { | |
MapReduceInputFormatWrapper.setWrappedInputFormat(tapInputFormat.getClass, tapConf) | |
DelegateCombineFileInputFormat | |
.setDelegateInputFormat(tapConf, classOf[MapReduceInputFormatWrapper[_, _]]) | |
} | |
tapConf | |
}.toList | |
// the outer MultiInputFormat passes to this tap, its own separate instance | |
// of the job conf, so it is safe to nest another MultiInputFormat here | |
cascading.tap.hadoop.io.MultiInputFormat.addInputFormat(conf, tapJobConfs: _*) | |
// the above call also sets the outer input format to be MultiInputFormat | |
// set tapConfigs to be used for hash join and toIterator cases | |
val tapConfigMaps: List[Map[String, String]] = tapJobConfs.map { conf => | |
conf.asScala.map { e => e.getKey -> e.getValue }.toMap | |
} | |
tapConfigs = Some(tapConfigMaps) | |
} | |
override def getScheme: Scheme[JobConf, RecordReader[_, _], Void, _, _] = | |
MultiFormatScheme(taps.map(_.getScheme), tapIndexKey) | |
.asInstanceOf[Scheme[JobConf, RecordReader[_, _], Void, _, _]] | |
override def toString: String = s"MultiFormatSourceTap[${taps.size}: ${taps.take(10).mkString(",")} ...]" | |
override def hashCode: Int = randomId.hashCode | |
override def equals(a: Any): Boolean = | |
a match { | |
case mfst: MultiFormatSourceTap => | |
mfst.getIdentifier == getIdentifier | |
case _ => false | |
} | |
} | |
object MultiFormatScheme { | |
def apply(_schemes: Iterable[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]], | |
tapIndexJobConfKey: String): MultiFormatScheme = | |
new MultiFormatScheme { | |
override def schemes = _schemes.toList | |
override val tapIndexKey = tapIndexJobConfKey | |
} | |
} | |
/** | |
* Top-level scheme where underlying schemes can correspond to different data formats. | |
* | |
* During reads, the correct underlying scheme is picked based on the index | |
* provided in job conf for the source call. | |
* | |
* This scheme is to be used in conjunction with MultiFormatSourceTap | |
* which sets the needed index value in the job conf per tap. | |
*/ | |
abstract class MultiFormatScheme extends Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], AnyRef, AnyRef] { | |
import MultiFormatSourceTap.CascadingHadoopTap | |
protected def schemes: List[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]] | |
protected def tapIndexKey: String | |
// .source method is called repeatedly (per record) on mappers | |
// so we cache the picked scheme during sourcePrepare to avoid repeated jobconf lookups | |
private[this] var pickedScheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], AnyRef, AnyRef] = null | |
override def sourceConfInit(flow: FlowProcess[JobConf], tap: CascadingHadoopTap, conf: JobConf): Unit = () | |
override def sinkConfInit(flow: FlowProcess[JobConf], tap: CascadingHadoopTap, conf: JobConf): Unit = | |
sys.error("MultiFormatScheme cannot be used as sink.") | |
// initialize the picked scheme, this is called only once on the mapper | |
override def sourcePrepare(flow: FlowProcess[JobConf], sourceCall: SourceCall[AnyRef, RecordReader[_, _]]): Unit = { | |
val tapIndex = flow.getProperty(tapIndexKey) | |
if (tapIndex == null) | |
sys.error(s"Key $tapIndexKey not found in job conf") | |
else { | |
val scheme = schemes(tapIndex.asInstanceOf[String].toInt) | |
.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], AnyRef, AnyRef]] | |
scheme.sourcePrepare(flow, sourceCall) | |
pickedScheme = scheme | |
} | |
} | |
// this is called once per record so we keep it as efficient as we can | |
// pickedScheme will already have been set in sourcePrepare call | |
override def source(flow: FlowProcess[JobConf], sourceCall: SourceCall[AnyRef, RecordReader[_, _]]): Boolean = | |
pickedScheme.source(flow, sourceCall) | |
// this runs after all records are read | |
override def sourceCleanup(flow: FlowProcess[JobConf], sourceCall: SourceCall[AnyRef, RecordReader[_, _]]): Unit = | |
pickedScheme.sourceCleanup(flow, sourceCall) | |
override def sink(flow: FlowProcess[JobConf], sinkCall: SinkCall[AnyRef, OutputCollector[_, _]]): Unit = | |
sys.error("MultiFormatScheme cannot be used as sink.") | |
private final val randomId = UUID.randomUUID.toString | |
override def hashCode: Int = randomId.hashCode | |
override def equals(a: Any): Boolean = | |
a match { | |
case mfs: MultiFormatScheme => | |
mfs.hashCode == hashCode | |
case _ => false | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment