Skip to content

Instantly share code, notes, and snippets.

@piyushnarang
Created August 12, 2016 18:11
Show Gist options
  • Save piyushnarang/a6a8878fff30ea7438997d9f01d4c2e2 to your computer and use it in GitHub Desktop.
Save piyushnarang/a6a8878fff30ea7438997d9f01d4c2e2 to your computer and use it in GitHub Desktop.
import java.util.{Iterator => JIterator, UUID}
import cascading.flow.FlowProcess
import cascading.flow.hadoop.HadoopFlowProcess
import cascading.scheme.{Scheme, SinkCall, SourceCall}
import cascading.tap._
import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator
import cascading.tuple._
import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper
import com.twitter.elephantbird.mapreduce.input.MapReduceInputFormatWrapper
import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat
import org.apache.hadoop.mapred.{JobConf, OutputCollector, RecordReader}
import scala.collection.JavaConverters._
object MultiFormatSourceTap {
type CascadingHadoopTap = Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]
val TapIndexKeyPrefix = "scalding_internal.multiformat.tapindex"
def apply(inputTaps: Iterable[CascadingHadoopTap]): MultiFormatSourceTap = {
require(inputTaps.nonEmpty, "MultiFormatSourceTap with no underlying taps")
new MultiFormatSourceTap {
override protected lazy val taps: Iterable[CascadingHadoopTap] = inputTaps
}
}
}
/**
* Same as Cascading's MultiSourceTap, but with no restriction that all
* underlying schemes and taps be of the same class.
*/
abstract class MultiFormatSourceTap
extends SourceTap[JobConf, RecordReader[_, _]]
with CompositeTap[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] {
import MultiFormatSourceTap._
protected def taps: Iterable[CascadingHadoopTap]
// keep a copy of the underlying tap configs
// to be used for hash joins and toIterator cases.
// JobConf is not serializable so we use a Map instead
private[this] var tapConfigs: Option[List[Map[String, String]]] = None
class TupleIterator(it: TupleEntryIterator) extends JIterator[Tuple] {
override def hasNext: Boolean = it.hasNext
override def next: Tuple = it.next.getTuple
override def remove: Unit = it.remove
}
private final val randomId = UUID.randomUUID.toString
// use a key specific to this tap
private final val tapIndexKey = s"$TapIndexKeyPrefix.$randomId"
override def getIdentifier: String = randomId
override protected def getChildTaps: JIterator[CascadingHadoopTap] =
taps.iterator.asJava
override def isReplace: Boolean = false // cannot be used as sink
override def getNumChildTaps: Long = taps.size.toLong
override def getModifiedTime(conf: JobConf): Long = taps.map(_.getModifiedTime(conf)).max
override def resourceExists(conf: JobConf): Boolean = taps.forall(_.resourceExists(conf))
override def openForRead(flow: FlowProcess[JobConf], input: RecordReader[_, _]): TupleEntryIterator =
if (input != null) {
new HadoopTupleEntrySchemeIterator(flow, this, input)
} else {
// this is the code path for toIterator and hashjoin
tapConfigs match {
case None =>
sys.error("tapConfigs is not defined. This should have been defined in Tap#sourceConfInit method.")
case Some(tc) =>
val tapIterators = taps.zipWithIndex.map { case (t, i) =>
val tapConfig = tc(i)
val tapJobConf = new JobConf(false)
tapConfig.foreach { case (k, v) => tapJobConf.set(k, v) }
val tapFlow = new HadoopFlowProcess(tapJobConf)
new TupleIterator(t.openForRead(tapFlow))
}
new TupleEntryChainIterator(getSourceFields, tapIterators.toSeq: _*)
}
}
// generate separate job conf per tap
// and use cascading's MultiInputFormat to group them
override def sourceConfInit(flow: FlowProcess[JobConf], conf: JobConf): Unit = {
val tapJobConfs: List[JobConf] = taps.zipWithIndex.map { case (tap, i) =>
// verify that input paths are set for this tap
if (tap.getIdentifier == null)
sys.error(s"Tap may not have null identifier / input path: $tap")
val tapConf = flow.copyConfig(conf)
// this uniquely identifies the tap, scheme pair during sourcePrepare
tapConf.set(tapIndexKey, i.toString)
// sourceConfInit on the tap here sets the input paths in tapConf
// and also calls sourceConfInit on the underlying scheme
tap.sourceConfInit(flow, tapConf)
val tapInputFormat = tapConf.getInputFormat
// make sure we don't double wrap an ElephantBird format
if (tapConf.getBoolean("multiformat.source.combine", false) &&
!tapInputFormat.isInstanceOf[DeprecatedInputFormatWrapper[_, _]]) {
MapReduceInputFormatWrapper.setWrappedInputFormat(tapInputFormat.getClass, tapConf)
DelegateCombineFileInputFormat
.setDelegateInputFormat(tapConf, classOf[MapReduceInputFormatWrapper[_, _]])
}
tapConf
}.toList
// the outer MultiInputFormat passes to this tap, its own separate instance
// of the job conf, so it is safe to nest another MultiInputFormat here
cascading.tap.hadoop.io.MultiInputFormat.addInputFormat(conf, tapJobConfs: _*)
// the above call also sets the outer input format to be MultiInputFormat
// set tapConfigs to be used for hash join and toIterator cases
val tapConfigMaps: List[Map[String, String]] = tapJobConfs.map { conf =>
conf.asScala.map { e => e.getKey -> e.getValue }.toMap
}
tapConfigs = Some(tapConfigMaps)
}
override def getScheme: Scheme[JobConf, RecordReader[_, _], Void, _, _] =
MultiFormatScheme(taps.map(_.getScheme), tapIndexKey)
.asInstanceOf[Scheme[JobConf, RecordReader[_, _], Void, _, _]]
override def toString: String = s"MultiFormatSourceTap[${taps.size}: ${taps.take(10).mkString(",")} ...]"
override def hashCode: Int = randomId.hashCode
override def equals(a: Any): Boolean =
a match {
case mfst: MultiFormatSourceTap =>
mfst.getIdentifier == getIdentifier
case _ => false
}
}
object MultiFormatScheme {
def apply(_schemes: Iterable[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]],
tapIndexJobConfKey: String): MultiFormatScheme =
new MultiFormatScheme {
override def schemes = _schemes.toList
override val tapIndexKey = tapIndexJobConfKey
}
}
/**
* Top-level scheme where underlying schemes can correspond to different data formats.
*
* During reads, the correct underlying scheme is picked based on the index
* provided in job conf for the source call.
*
* This scheme is to be used in conjunction with MultiFormatSourceTap
* which sets the needed index value in the job conf per tap.
*/
abstract class MultiFormatScheme extends Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], AnyRef, AnyRef] {
import MultiFormatSourceTap.CascadingHadoopTap
protected def schemes: List[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], _, _]]
protected def tapIndexKey: String
// .source method is called repeatedly (per record) on mappers
// so we cache the picked scheme during sourcePrepare to avoid repeated jobconf lookups
private[this] var pickedScheme: Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], AnyRef, AnyRef] = null
override def sourceConfInit(flow: FlowProcess[JobConf], tap: CascadingHadoopTap, conf: JobConf): Unit = ()
override def sinkConfInit(flow: FlowProcess[JobConf], tap: CascadingHadoopTap, conf: JobConf): Unit =
sys.error("MultiFormatScheme cannot be used as sink.")
// initialize the picked scheme, this is called only once on the mapper
override def sourcePrepare(flow: FlowProcess[JobConf], sourceCall: SourceCall[AnyRef, RecordReader[_, _]]): Unit = {
val tapIndex = flow.getProperty(tapIndexKey)
if (tapIndex == null)
sys.error(s"Key $tapIndexKey not found in job conf")
else {
val scheme = schemes(tapIndex.asInstanceOf[String].toInt)
.asInstanceOf[Scheme[JobConf, RecordReader[_, _], OutputCollector[_, _], AnyRef, AnyRef]]
scheme.sourcePrepare(flow, sourceCall)
pickedScheme = scheme
}
}
// this is called once per record so we keep it as efficient as we can
// pickedScheme will already have been set in sourcePrepare call
override def source(flow: FlowProcess[JobConf], sourceCall: SourceCall[AnyRef, RecordReader[_, _]]): Boolean =
pickedScheme.source(flow, sourceCall)
// this runs after all records are read
override def sourceCleanup(flow: FlowProcess[JobConf], sourceCall: SourceCall[AnyRef, RecordReader[_, _]]): Unit =
pickedScheme.sourceCleanup(flow, sourceCall)
override def sink(flow: FlowProcess[JobConf], sinkCall: SinkCall[AnyRef, OutputCollector[_, _]]): Unit =
sys.error("MultiFormatScheme cannot be used as sink.")
private final val randomId = UUID.randomUUID.toString
override def hashCode: Int = randomId.hashCode
override def equals(a: Any): Boolean =
a match {
case mfs: MultiFormatScheme =>
mfs.hashCode == hashCode
case _ => false
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment