-
-
Save hello009-commits/9b67057c1a71b6184882d31ac47c0f52 to your computer and use it in GitHub Desktop.
Parquet to CSV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ComparisonChain._ | |
import java.io.BufferedInputStream | |
import java.io.BufferedOutputStream | |
import java.io.BufferedReader | |
import java.io.BufferedWriter | |
import java.io.Closeable | |
import java.io.File | |
import java.io.File | |
import java.io.FileInputStream | |
import java.io.FilenameFilter | |
import java.io.FileOutputStream | |
import java.io.FileReader | |
import java.io.FileWriter | |
import java.io.InputStream | |
import java.io.IOException | |
import java.io.OutputStream | |
import java.io.PrintWriter | |
import java.util.ArrayList | |
import java.util.Arrays | |
import java.util.Collections | |
import java.util.List | |
import java.util.regex.Pattern | |
import org.apache.commons.io.FileUtils | |
import org.apache.commons.io.IOUtils | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.fs.Path | |
import org.apache.hadoop.io.file.tfile.Utils.Version | |
import parquet.column.page.PageReadStore | |
import parquet.example.data.Group | |
import parquet.example.data.simple.convert.GroupRecordConverter | |
import parquet.hadoop.example.GroupReadSupport | |
import parquet.hadoop.metadata.ParquetMetadata | |
import parquet.hadoop.ParquetFileReader | |
import parquet.hadoop.ParquetReader | |
import parquet.io.ColumnIOFactory | |
import parquet.io.MessageColumnIO | |
import parquet.io.RecordReader | |
import parquet.Log | |
import parquet.Preconditions | |
import parquet.schema.MessageType | |
import parquet.schema.MessageTypeParser | |
import Version._ | |
object Utils { | |
def closeQuietly(res: Closeable) { | |
try { | |
if (res != null) { | |
res.close() | |
} | |
} catch { | |
case ioe: IOException => println("Exception closing reader " + res + ": " + ioe.getMessage) | |
} | |
} | |
} | |
class ConvertUtils | |
object ConvertUtils { | |
private val LOG = Log.getLog(classOf[ConvertUtils]) | |
val CSV_DELIMITER = "\t" | |
private def readFile(path: String): String = { | |
val reader = new BufferedReader(new FileReader(path)) | |
val stringBuilder = new StringBuilder() | |
try { | |
var line: String = null | |
val ls = System.getProperty("line.separator") | |
while ((line = reader.readLine()) != null) { | |
stringBuilder.append(line) | |
stringBuilder.append(ls) | |
} | |
} finally { | |
Utils.closeQuietly(reader) | |
} | |
stringBuilder.toString | |
} | |
def getSchema(csvFile: File): String = { | |
val fileName = csvFile.getName.substring(0, csvFile.getName.length - ".csv".length) + | |
".schema" | |
val schemaFile = new File(csvFile.getParentFile, fileName) | |
readFile(schemaFile.getAbsolutePath) | |
} | |
def convertParquetToCSV(parquetFile: File, csvOutputFile: File) { | |
Preconditions.checkArgument(parquetFile.getName.endsWith(".parquet"), "parquet file should have .parquet extension") | |
Preconditions.checkArgument(csvOutputFile.getName.endsWith(".csv"), "csv file should have .csv extension") | |
Preconditions.checkArgument(!csvOutputFile.exists(), "Output file " + csvOutputFile.getAbsolutePath + " already exists") | |
LOG.info("Converting " + parquetFile.getName + " to " + csvOutputFile.getName) | |
val parquetFilePath = new Path(parquetFile.toURI()) | |
val configuration = new Configuration(true) | |
val readSupport = new GroupReadSupport() | |
val readFooter = ParquetFileReader.readFooter(configuration, parquetFilePath) | |
val schema = readFooter.getFileMetaData.getSchema | |
readSupport.init(configuration, null, schema) | |
val w = new BufferedWriter(new FileWriter(csvOutputFile)) | |
val reader = new ParquetReader[Group](parquetFilePath, readSupport) | |
try { | |
var g: Group = null | |
while ((g = reader.read()) != null) { | |
writeGroup(w, g, schema) | |
} | |
reader.close() | |
} finally { | |
Utils.closeQuietly(w) | |
} | |
} | |
private def writeGroup(w: BufferedWriter, g: Group, schema: MessageType) { | |
for (j <- 0 until schema.getFieldCount) { | |
if (j > 0) { | |
w.write(CSV_DELIMITER) | |
} | |
val valueToString = g.getValueToString(j, 0) | |
w.write(valueToString) | |
} | |
w.write('\n') | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment