Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@dmpetrov
Created March 6, 2017 05:07
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dmpetrov/a4a5dc2cc8719619410e37dedde5130e to your computer and use it in GitHub Desktop.
Save dmpetrov/a4a5dc2cc8719619410e37dedde5130e to your computer and use it in GitHub Desktop.
Save Spark dataframe to a single CSV file
# Code for blogpost:
# https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/
def saveDfToCsv(df: DataFrame, tsvOutput: String,
sep: String = ",", header: Boolean = false): Unit = {
val tmpParquetDir = "Posts.tmp.parquet"
df.repartition(1).write.
format("com.databricks.spark.csv").
option("header", header.toString).
option("delimiter", sep).
save(tmpParquetDir)
val dir = new File(tmpParquetDir)
val tmpTsvFile = tmpParquetDir + File.separatorChar + "part-00000"
(new File(tmpTsvFile)).renameTo(new File(tsvOutput))
dir.listFiles.foreach( f => f.delete )
dir.delete
}
@shjoin
Copy link

shjoin commented May 4, 2017

thanks for this codes but how can we do below with scala ,i dont know java
val dir = new File(tmpParquetDir)
val tmpTsvFile = tmpParquetDir + File.separatorChar + "part-00000"
(new File(tmpTsvFile)).renameTo(new File(tsvOutput))

dir.listFiles.foreach( f => f.delete )
dir.delete

@prakashrd
Copy link

Hi dmpetrov,
Thanks for the post. But i have to change one of yours to make it work. The line where you extract the new file tmpTsvFile.
Here is my full code

`def saveDfToCsv(df: DataFrame, tsvOutput: String,
sep: String = ",", header: Boolean = false): Unit = {
val tmpParquetDir = "Posts.tmp.parquet"

df.repartition(1).write.
    format("com.databricks.spark.csv").
    option("header", header.toString).
    option("delimiter", sep).
    save(tmpParquetDir)

val dir = new File(tmpParquetDir)
val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv"
val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString
(new File(tmpTsvFile)).renameTo(new File(tsvOutput))

dir.listFiles.foreach( f => f.delete )
dir.delete

}`

@iretex
Copy link

iretex commented Jan 21, 2020

I kindly request for a python equivalent, I have tried severally to save pyspark dataframe to csv without succcess.
I run spark on my local machine.
Thanks very much!!!

Py4JJavaError Traceback (most recent call last)
in ()
1 out_path = "C:/Users/NG005454/OneDrive - CCHellenic/Documents/Python_Exercise/new_territory"
2 customer_ps1_terr = customer_data.select(customer_data.demand_area_name,customer_data.sub_demand_area_name,customer_data.ps1_territory_id).filter(customer_data.sub_demand_area_name.isin(sub_demand_area)).distinct()
----> 3 customer_ps1_terr.write.save(out_path, format="csv", header=True)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\pyspark\sql\readwriter.py in save(self, path, format, mode, partitionBy, **options)
736 self._jwrite.save()
737 else:
--> 738 self._jwrite.save(path)
739
740 @SInCE(1.4)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\py4j\java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:

~\AppData\Local\Continuum\anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

~\AppData\Local\Continuum\anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(

Py4JJavaError: An error occurred while calling o716.save.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 80.0 failed 1 times, most recent failure: Lost task 0.0 in stage 80.0 (TID 1193, localhost, executor driver): java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\NG005454\OneDrive - CCHellenic\Documents\Python_Exercise\new_territory_temporary\0_temporary\attempt_20200116102706_0080_m_000000_1193\part-00000-a8e4807b-ad43-4e16-9a00-5c7218423c6e-c000.csv
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment