Created
July 24, 2014 09:07
-
-
Save darkjh/795e28fd72e88e5a7d9b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def copy[D <: RawData](inputPath: String, outputPath: String, | |
splitsNum: Int = 128, replication: Int = 2) | |
(implicit sc: SparkContext, ct: ClassTag[D]): Unit = { | |
// use reflection to | |
// - get the ctor | |
// - get the canonical name tagged in annotation | |
val cls = ct.runtimeClass | |
val ctor = (args: Array[String]) => | |
cls.getConstructor(classOf[Array[String]]).newInstance(args) | |
val canonicalName = cls.getAnnotation(classOf[CanonicalName]).name() | |
val input = parseFilePath(inputPath, resolveFilePattern(canonicalName)) | |
val output = parseFilePath(outputPath, canonicalName) | |
// in minimum will create 128 input partitions | |
// only true for splittable files, eg. text file on s3n | |
// will still have only 1 input partition for non-splittable input | |
val in = sc.textFile(input, splitsNum) | |
val parsed = in.map { | |
line => | |
val obj = ctor(line.split("\t")).asInstanceOf[D] | |
(new VIntWritable(obj.key), obj) | |
} | |
// re-partition according to the `int` key | |
val repartitioned = parsed.repartition(splitsNum) | |
// save in sequence format | |
val jobConf = new JobConf() | |
jobConf.setInt("dfs.replication", replication) | |
repartitioned.saveAsHadoopFile(output, classOf[VIntWritable], cls, | |
classOf[SequenceFileOutputFormat[VIntWritable, D]], | |
conf = jobConf, codec = Some(classOf[GzipCodec])) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment