Skip to content

Instantly share code, notes, and snippets.

@hello009-commits
Forked from nightscape/ConvertUtils.scala
Created February 28, 2017 16:51
Show Gist options
  • Save hello009-commits/9b67057c1a71b6184882d31ac47c0f52 to your computer and use it in GitHub Desktop.
Save hello009-commits/9b67057c1a71b6184882d31ac47c0f52 to your computer and use it in GitHub Desktop.
Parquet to CSV
import ComparisonChain._
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.io.BufferedReader
import java.io.BufferedWriter
import java.io.Closeable
import java.io.File
import java.io.File
import java.io.FileInputStream
import java.io.FilenameFilter
import java.io.FileOutputStream
import java.io.FileReader
import java.io.FileWriter
import java.io.InputStream
import java.io.IOException
import java.io.OutputStream
import java.io.PrintWriter
import java.util.ArrayList
import java.util.Arrays
import java.util.Collections
import java.util.List
import java.util.regex.Pattern
import org.apache.commons.io.FileUtils
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.file.tfile.Utils.Version
import parquet.column.page.PageReadStore
import parquet.example.data.Group
import parquet.example.data.simple.convert.GroupRecordConverter
import parquet.hadoop.example.GroupReadSupport
import parquet.hadoop.metadata.ParquetMetadata
import parquet.hadoop.ParquetFileReader
import parquet.hadoop.ParquetReader
import parquet.io.ColumnIOFactory
import parquet.io.MessageColumnIO
import parquet.io.RecordReader
import parquet.Log
import parquet.Preconditions
import parquet.schema.MessageType
import parquet.schema.MessageTypeParser
import Version._
object Utils {
def closeQuietly(res: Closeable) {
try {
if (res != null) {
res.close()
}
} catch {
case ioe: IOException => println("Exception closing reader " + res + ": " + ioe.getMessage)
}
}
}
class ConvertUtils
object ConvertUtils {
private val LOG = Log.getLog(classOf[ConvertUtils])
val CSV_DELIMITER = "\t"
private def readFile(path: String): String = {
val reader = new BufferedReader(new FileReader(path))
val stringBuilder = new StringBuilder()
try {
var line: String = null
val ls = System.getProperty("line.separator")
while ((line = reader.readLine()) != null) {
stringBuilder.append(line)
stringBuilder.append(ls)
}
} finally {
Utils.closeQuietly(reader)
}
stringBuilder.toString
}
def getSchema(csvFile: File): String = {
val fileName = csvFile.getName.substring(0, csvFile.getName.length - ".csv".length) +
".schema"
val schemaFile = new File(csvFile.getParentFile, fileName)
readFile(schemaFile.getAbsolutePath)
}
def convertParquetToCSV(parquetFile: File, csvOutputFile: File) {
Preconditions.checkArgument(parquetFile.getName.endsWith(".parquet"), "parquet file should have .parquet extension")
Preconditions.checkArgument(csvOutputFile.getName.endsWith(".csv"), "csv file should have .csv extension")
Preconditions.checkArgument(!csvOutputFile.exists(), "Output file " + csvOutputFile.getAbsolutePath + " already exists")
LOG.info("Converting " + parquetFile.getName + " to " + csvOutputFile.getName)
val parquetFilePath = new Path(parquetFile.toURI())
val configuration = new Configuration(true)
val readSupport = new GroupReadSupport()
val readFooter = ParquetFileReader.readFooter(configuration, parquetFilePath)
val schema = readFooter.getFileMetaData.getSchema
readSupport.init(configuration, null, schema)
val w = new BufferedWriter(new FileWriter(csvOutputFile))
val reader = new ParquetReader[Group](parquetFilePath, readSupport)
try {
var g: Group = null
while ((g = reader.read()) != null) {
writeGroup(w, g, schema)
}
reader.close()
} finally {
Utils.closeQuietly(w)
}
}
private def writeGroup(w: BufferedWriter, g: Group, schema: MessageType) {
for (j <- 0 until schema.getFieldCount) {
if (j > 0) {
w.write(CSV_DELIMITER)
}
val valueToString = g.getValueToString(j, 0)
w.write(valueToString)
}
w.write('\n')
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment