COPY Spark DataFrame rows to PostgreSQL (via JDBC)
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.{ DataFrame, Row }
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection
val jdbcUrl = s"jdbc:postgresql://..." // db credentials elided
val connectionProperties = {
val props = new java.util.Properties()
props.setProperty("driver", "org.postgresql.Driver")
// Spark reads the "driver" property to allow users to override the default driver selected, otherwise
// it picks the Redshift driver, which doesn't support JDBC CopyManager.
val cf: () => Connection = JdbcUtils.createConnectionFactory(jdbcUrl, connectionProperties)
// Convert every partition (an `Iterator[Row]`) to bytes (InputStream)
def rowsToInputStream(rows: Iterator[Row], delimiter: String): InputStream = {
val bytes: Iterator[Byte] = { row =>
(row.mkString(delimiter) + "\n").getBytes
new InputStream {
override def read(): Int = if (bytes.hasNext) { & 0xff // bitwise AND - make the signed byte an unsigned int from 0-255
} else {
// Beware: this will open a db connection for every partition of your DataFrame.
frame.foreachPartition { rows =>
val conn = cf()
val cm = new CopyManager(conn.asInstanceOf[BaseConnection])
"""COPY my_schema._mytable FROM STDIN WITH (NULL 'null', FORMAT CSV, DELIMITER E'\t')""", // adjust COPY settings as you desire, options from
rowsToInputStream(rows, "\t"))
