Skip to content

Instantly share code, notes, and snippets.

@CalebFenton
Last active September 10, 2016 17:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CalebFenton/1395586794d0037373f458cadb16da33 to your computer and use it in GitHub Desktop.
Save CalebFenton/1395586794d0037373f458cadb16da33 to your computer and use it in GitHub Desktop.
Convert CompressionCodec of Hadoop Sequence Files to Snappy
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.io.SequenceFile
import org.apache.hadoop.io.SequenceFile.{Metadata, Reader, Writer}
import org.apache.hadoop.io.compress.{DefaultCodec, SnappyCodec}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
def migrateSnappy(fs: FileSystem, path: String): Unit = {
val snappyPath = new Path(path)
val oldPath = new Path(path.replace(".snappy", ""))
fs.delete(oldPath, false)
fs.rename(snappyPath, oldPath)
}
def getReader(conf: Configuration, filePath: String): Reader = {
val inPath = new Path(filePath)
new SequenceFile.Reader(conf, Reader.file(inPath), Reader.bufferSize(4096), Reader.start(0))
}
def convertToSnappy(sc: SparkContext, filePaths: Array[String]) = {
val conf = sc.hadoopConfiguration
val fs = FileSystem.get(conf)
for (inFilePath <- filePaths) {
println("INPUT PATH: " + inFilePath)
val reader = getReader(conf, inFilePath)
val key = ReflectionUtils.newInstance(reader.getKeyClass, conf).asInstanceOf[Writable]
val value = ReflectionUtils.newInstance(reader.getValueClass, conf).asInstanceOf[Writable]
val outFilePath = inFilePath.replace(".pkg.sf", ".snappy.pkg.sf")
val outPath = new Path(outFilePath)
if (reader.getCompressionCodec.isInstanceOf[DefaultCodec]) {
val writer = SequenceFile.createWriter(
conf,
Writer.file(outPath),
Writer.keyClass(key.getClass),
Writer.valueClass(value.getClass),
Writer.bufferSize(fs.getConf.getInt("io.file.buffer.size", 4096)),
Writer.replication(fs.getDefaultReplication(outPath)),
Writer.blockSize(1073741824),
Writer.compression(SequenceFile.CompressionType.BLOCK, new SnappyCodec()),
Writer.progressable(null),
Writer.metadata(new Metadata())
)
while (reader.next(key, value)) {
println("Writing key: " + key + ", outpath=" + outFilePath)
writer.append(key, value)
}
writer.close()
}
}
}
def getSequenceFiles(sc: SparkContext): Array[String] = {
val fs = FileSystem.get(sc.hadoopConfiguration)
val path = new Path("hdfs:///path/to/your/files/")
val status = fs.getFileStatus(path)
getHadoopFiles(fs, status)
}
def getHadoopFiles(fs: FileSystem, status: FileStatus): Array[String] = {
if (status.isDirectory)
fs
.listStatus(status.getPath)
.flatMap(getHadoopFiles(fs, _))
else
Array(status.getPath.toString)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment