Skip to content

Instantly share code, notes, and snippets.

@blever
Created November 12, 2011 21:58
Show Gist options
  • Save blever/1361178 to your computer and use it in GitHub Desktop.
Save blever/1361178 to your computer and use it in GitHub Desktop.
Augmenting DataSource/DataSink with Converter idea
/* Something along the lines of Crunch's "Converter" but split out into separate input/output traits */
trait InputConverter[K, V, S] {
def fromKeyValue(K key, V value): S
}
trait OutputConverter[K, V, S] {
toKeyValue(s: S): (K, V)
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/* Augment with InputConverter */
trait DataSource[K, V, A] {
def inputTypeName: String
def inputPath: Path
def inputFormat: Class[_ <: FileInputFormat[_,_]]
def converter: InputConverter[K, V, A]
}
/* Augment with OutputConverter */
trait DataSink[K, V, B] {
def outputTypeName: String
def outputPath: Path
def outputFormat: Class[_ <: FileOutputFormat[K,V]]
def converter: OutputConverter[K, V, B]
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// TextInput.scala
class TextLoader(path: String) extends Loader[String] {
def mkInputStore(node: AST.Load[String]) = new InputStore(node) {
def inputTypeName = typeName
val inputPath = new Path(path)
val inputFormat = classOf[TextInputFormat] // using standard input format not 'SimplerTextInputFormat'
val converter = TextInputConverter
}
}
/* Just pull the 'String' out of the 'Text' Writable wrapper */
object TextFormatConverter extends InputConverter[LongWritable, Text, String] {
def fromKeyValue(LongWritable off, Text line): String = line.toString
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/* Hadoop Mapper class updated to do conversion before mapping */
class MscrMapper[A, K, V] extends HMapper[Writable, Writable, TaggedKey, TaggedValue] {
var inputs: Map[Int, (InputConverter[_, _, _], Set[TaggedMapper[_,_,_]])] = _ // include converter for each input channel
var mappers: (InputCoverter[K, V, A], Set[TaggedMapper[A, K, V]]) = _
var tk: TaggedKey = _
var tv: TaggedValue = _
def configure(conf: JobConf) = {
// updated ...
}
def map(key: Writable,
value: Writable,
output: OutputCollector[TaggedKey, TaggedValue],
reporter: Reporter) = {
/* Find the mappers for this input channel from the tagged input split. */
if (mappers == null) {
// updated ...
}
/* Do the mappings. */
mappers.foreach { mapper =>
val x: A = mappers._1.converter.fromKeyValue(key, value).asInstanceOf[A] // doing the conversion for each input
mapper.map(x).foreach { case (k, v) =>
mapper.tags.foreach { tag =>
tk.set(tag, k)
tv.set(tag, v)
output.collect(tk, tv)
}
}
}
}
def close() = {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment