Skip to content

Instantly share code, notes, and snippets.

@duyet
Created September 21, 2016 03:35
Show Gist options
  • Save duyet/f5a948d63177a1bf00d1b68696c6dee8 to your computer and use it in GitHub Desktop.
Save duyet/f5a948d63177a1bf00d1b68696c6dee8 to your computer and use it in GitHub Desktop.
Spark convert CSV to Parquet.
def convert(sqlContext: SQLContext, filename: String, schema: StructType, tablename: String) {
// import text-based table first into a data frame.
// make sure to use com.databricks:spark-csv version 1.3+
// which has consistent treatment of empty strings as nulls.
val df = sqlContext.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("delimiter","|")
.option("nullValue","")
.option("treatEmptyValuesAsNulls","true")
.load(filename)
// now simply write to a parquet file
df.write.parquet("/user/spark/data/parquet1000g/"+tablename)
}
// usage exampe -- a tpc-ds table called catalog_page
schema= StructType(Array(
StructField("cp_catalog_page_sk", IntegerType,false),
StructField("cp_catalog_page_id", StringType,false),
StructField("cp_start_date_sk", IntegerType,true),
StructField("cp_end_date_sk", IntegerType,true),
StructField("cp_department", StringType,true),
StructField("cp_catalog_number", LongType,true),
StructField("cp_catalog_page_number", LongType,true),
StructField("cp_description", StringType,true),
StructField("cp_type", StringType,true)))
// Let convert
convert(sqlContext, hadoopdsPath+"/catalog_page/*", schema, "catalog_page")
@amaneight
Copy link

amaneight commented Sep 13, 2017

An alternative way to do this is to first create data frame from csv file, then store this data frame in parquet file and then create a new data frame from parquet file.

scala > val df = spark.read.option("header","true").csv("csv_file.csv")
scala > df.write.parquet("csv_to_paraquet")
scala > val df_1 = spark.read.option("header","true").parquet("csv_to_paraquet")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment