Skip to content

Instantly share code, notes, and snippets.

@joemcmahon
Last active September 9, 2021 02:39
Show Gist options
  • Save joemcmahon/fb41b55fd6262f79c49a3354c532924e to your computer and use it in GitHub Desktop.
Save joemcmahon/fb41b55fd6262f79c49a3354c532924e to your computer and use it in GitHub Desktop.
A syntax-corrected version of the code in https://stackoverflow.com/a/46594963/39791
/**
*
* Write a dataframe to CSV at the location specified. Assumes that the contents
* of the dataframe being written are small enough to fit in memory on the Spark
* master. (I use this for deequ suggested checks, which is usually less than 30
* or so entries.)
*
*/
import org.apache.spark.sql._
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import java.io.{BufferedWriter, OutputStreamWriter}
val SPARK_WRITE_LOCATION = "/tmp"
def saveCSV(spark: SparkSession, results : DataFrame, filename: String) {
var fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
if (spark.conf.get("spark.master").toString.contains("local")) {
fs = FileSystem.getLocal(new Configuration())
}
val tempWritePath = new Path(SPARK_WRITE_LOCATION)
if (fs.exists(tempWritePath)) {
val x = fs.delete(new Path(SPARK_WRITE_LOCATION), true)
assert(x)
}
if (results.count > 0) {
val hadoopFilepath = new Path(SPARK_WRITE_LOCATION, filename)
val writeStream = fs.create(hadoopFilepath, true)
val bw = new BufferedWriter( new OutputStreamWriter( writeStream, "UTF-8" ) )
val x = results.collect()
for (row : Row <- x) {
val rowString = row.mkString(start = "", sep = ",", end="\n")
bw.write(rowString)
}
bw.close()
writeStream.close()
val resultsWritePath = new Path(SPARK_WRITE_LOCATION, filename)
if (fs.exists(resultsWritePath)) {
fs.delete(resultsWritePath, true)
}
fs.copyToLocalFile(false, hadoopFilepath, resultsWritePath, true)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment