Last active
September 10, 2016 17:31
-
-
Save CalebFenton/1395586794d0037373f458cadb16da33 to your computer and use it in GitHub Desktop.
Convert CompressionCodec of Hadoop Sequence Files to Snappy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} | |
import org.apache.hadoop.io.SequenceFile | |
import org.apache.hadoop.io.SequenceFile.{Metadata, Reader, Writer} | |
import org.apache.hadoop.io.compress.{DefaultCodec, SnappyCodec} | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.spark.SparkContext | |
def migrateSnappy(fs: FileSystem, path: String): Unit = { | |
val snappyPath = new Path(path) | |
val oldPath = new Path(path.replace(".snappy", "")) | |
fs.delete(oldPath, false) | |
fs.rename(snappyPath, oldPath) | |
} | |
def getReader(conf: Configuration, filePath: String): Reader = { | |
val inPath = new Path(filePath) | |
new SequenceFile.Reader(conf, Reader.file(inPath), Reader.bufferSize(4096), Reader.start(0)) | |
} | |
def convertToSnappy(sc: SparkContext, filePaths: Array[String]) = { | |
val conf = sc.hadoopConfiguration | |
val fs = FileSystem.get(conf) | |
for (inFilePath <- filePaths) { | |
println("INPUT PATH: " + inFilePath) | |
val reader = getReader(conf, inFilePath) | |
val key = ReflectionUtils.newInstance(reader.getKeyClass, conf).asInstanceOf[Writable] | |
val value = ReflectionUtils.newInstance(reader.getValueClass, conf).asInstanceOf[Writable] | |
val outFilePath = inFilePath.replace(".pkg.sf", ".snappy.pkg.sf") | |
val outPath = new Path(outFilePath) | |
if (reader.getCompressionCodec.isInstanceOf[DefaultCodec]) { | |
val writer = SequenceFile.createWriter( | |
conf, | |
Writer.file(outPath), | |
Writer.keyClass(key.getClass), | |
Writer.valueClass(value.getClass), | |
Writer.bufferSize(fs.getConf.getInt("io.file.buffer.size", 4096)), | |
Writer.replication(fs.getDefaultReplication(outPath)), | |
Writer.blockSize(1073741824), | |
Writer.compression(SequenceFile.CompressionType.BLOCK, new SnappyCodec()), | |
Writer.progressable(null), | |
Writer.metadata(new Metadata()) | |
) | |
while (reader.next(key, value)) { | |
println("Writing key: " + key + ", outpath=" + outFilePath) | |
writer.append(key, value) | |
} | |
writer.close() | |
} | |
} | |
} | |
def getSequenceFiles(sc: SparkContext): Array[String] = { | |
val fs = FileSystem.get(sc.hadoopConfiguration) | |
val path = new Path("hdfs:///path/to/your/files/") | |
val status = fs.getFileStatus(path) | |
getHadoopFiles(fs, status) | |
} | |
def getHadoopFiles(fs: FileSystem, status: FileStatus): Array[String] = { | |
if (status.isDirectory) | |
fs | |
.listStatus(status.getPath) | |
.flatMap(getHadoopFiles(fs, _)) | |
else | |
Array(status.getPath.toString) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment