Created
November 12, 2011 21:58
-
-
Save blever/1361178 to your computer and use it in GitHub Desktop.
Augmenting DataSource/DataSink with Converter idea
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Something along the lines of Crunch's "Converter" but split out into separate input/output traits */ | |
trait InputConverter[K, V, S] { | |
def fromKeyValue(K key, V value): S | |
} | |
trait OutputConverter[K, V, S] { | |
toKeyValue(s: S): (K, V) | |
} | |
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
/* Augment with InputConverter */ | |
trait DataSource[K, V, A] { | |
def inputTypeName: String | |
def inputPath: Path | |
def inputFormat: Class[_ <: FileInputFormat[_,_]] | |
def converter: InputConverter[K, V, A] | |
} | |
/* Augment with OutputConverter */ | |
trait DataSink[K, V, B] { | |
def outputTypeName: String | |
def outputPath: Path | |
def outputFormat: Class[_ <: FileOutputFormat[K,V]] | |
def converter: OutputConverter[K, V, B] | |
} | |
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
// TextInput.scala | |
class TextLoader(path: String) extends Loader[String] { | |
def mkInputStore(node: AST.Load[String]) = new InputStore(node) { | |
def inputTypeName = typeName | |
val inputPath = new Path(path) | |
val inputFormat = classOf[TextInputFormat] // using standard input format not 'SimplerTextInputFormat' | |
val converter = TextInputConverter | |
} | |
} | |
/* Just pull the 'String' out of the 'Text' Writable wrapper */ | |
object TextFormatConverter extends InputConverter[LongWritable, Text, String] { | |
def fromKeyValue(LongWritable off, Text line): String = line.toString | |
} | |
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
/* Hadoop Mapper class updated to do conversion before mapping */ | |
class MscrMapper[A, K, V] extends HMapper[Writable, Writable, TaggedKey, TaggedValue] { | |
var inputs: Map[Int, (InputConverter[_, _, _], Set[TaggedMapper[_,_,_]])] = _ // include converter for each input channel | |
var mappers: (InputCoverter[K, V, A], Set[TaggedMapper[A, K, V]]) = _ | |
var tk: TaggedKey = _ | |
var tv: TaggedValue = _ | |
def configure(conf: JobConf) = { | |
// updated ... | |
} | |
def map(key: Writable, | |
value: Writable, | |
output: OutputCollector[TaggedKey, TaggedValue], | |
reporter: Reporter) = { | |
/* Find the mappers for this input channel from the tagged input split. */ | |
if (mappers == null) { | |
// updated ... | |
} | |
/* Do the mappings. */ | |
mappers.foreach { mapper => | |
val x: A = mappers._1.converter.fromKeyValue(key, value).asInstanceOf[A] // doing the conversion for each input | |
mapper.map(x).foreach { case (k, v) => | |
mapper.tags.foreach { tag => | |
tk.set(tag, k) | |
tv.set(tag, v) | |
output.collect(tk, tv) | |
} | |
} | |
} | |
} | |
def close() = { | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment