Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Spark convert CSV to Parquet.
def convert(sqlContext: SQLContext, filename: String, schema: StructType, tablename: String) {
// import text-based table first into a data frame.
// make sure to use com.databricks:spark-csv version 1.3+
// which has consistent treatment of empty strings as nulls.
val df = sqlContext.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("delimiter","|")
.option("nullValue","")
.option("treatEmptyValuesAsNulls","true")
.load(filename)
// now simply write to a parquet file
df.write.parquet("/user/spark/data/parquet1000g/"+tablename)
}
// usage exampe -- a tpc-ds table called catalog_page
schema= StructType(Array(
StructField("cp_catalog_page_sk", IntegerType,false),
StructField("cp_catalog_page_id", StringType,false),
StructField("cp_start_date_sk", IntegerType,true),
StructField("cp_end_date_sk", IntegerType,true),
StructField("cp_department", StringType,true),
StructField("cp_catalog_number", LongType,true),
StructField("cp_catalog_page_number", LongType,true),
StructField("cp_description", StringType,true),
StructField("cp_type", StringType,true)))
// Let convert
convert(sqlContext, hadoopdsPath+"/catalog_page/*", schema, "catalog_page")
@amaneight

This comment has been minimized.

Copy link

commented Sep 13, 2017

An alternative way to do this is to first create data frame from csv file, then store this data frame in parquet file and then create a new data frame from parquet file.

scala > val df = spark.read.option("header","true").csv("csv_file.csv")
scala > df.write.parquet("csv_to_paraquet")
scala > val df_1 = spark.read.option("header","true").parquet("csv_to_paraquet")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.