Skip to content

Instantly share code, notes, and snippets.

@darkjh
Created July 24, 2014 09:07
Show Gist options
  • Save darkjh/795e28fd72e88e5a7d9b to your computer and use it in GitHub Desktop.
Save darkjh/795e28fd72e88e5a7d9b to your computer and use it in GitHub Desktop.
def copy[D <: RawData](inputPath: String, outputPath: String,
splitsNum: Int = 128, replication: Int = 2)
(implicit sc: SparkContext, ct: ClassTag[D]): Unit = {
// use reflection to
// - get the ctor
// - get the canonical name tagged in annotation
val cls = ct.runtimeClass
val ctor = (args: Array[String]) =>
cls.getConstructor(classOf[Array[String]]).newInstance(args)
val canonicalName = cls.getAnnotation(classOf[CanonicalName]).name()
val input = parseFilePath(inputPath, resolveFilePattern(canonicalName))
val output = parseFilePath(outputPath, canonicalName)
// in minimum will create 128 input partitions
// only true for splittable files, eg. text file on s3n
// will still have only 1 input partition for non-splittable input
val in = sc.textFile(input, splitsNum)
val parsed = in.map {
line =>
val obj = ctor(line.split("\t")).asInstanceOf[D]
(new VIntWritable(obj.key), obj)
}
// re-partition according to the `int` key
val repartitioned = parsed.repartition(splitsNum)
// save in sequence format
val jobConf = new JobConf()
jobConf.setInt("dfs.replication", replication)
repartitioned.saveAsHadoopFile(output, classOf[VIntWritable], cls,
classOf[SequenceFileOutputFormat[VIntWritable, D]],
conf = jobConf, codec = Some(classOf[GzipCodec]))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment