Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
COPY Spark DataFrame rows to PostgreSQL (via JDBC)
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.{ DataFrame, Row }
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection
val jdbcUrl = s"jdbc:postgresql://..." // db credentials elided
val connectionProperties = {
val props = new java.util.Properties()
props.setProperty("driver", "org.postgresql.Driver")
// Spark reads the "driver" property to allow users to override the default driver selected, otherwise
// it picks the Redshift driver, which doesn't support JDBC CopyManager.
val cf: () => Connection = JdbcUtils.createConnectionFactory(jdbcUrl, connectionProperties)
// Convert every partition (an `Iterator[Row]`) to bytes (InputStream)
def rowsToInputStream(rows: Iterator[Row], delimiter: String): InputStream = {
val bytes: Iterator[Byte] = { row =>
(row.mkString(delimiter) + "\n").getBytes
new InputStream {
override def read(): Int = if (bytes.hasNext) { & 0xff // bitwise AND - make the signed byte an unsigned int from 0-255
} else {
// Beware: this will open a db connection for every partition of your DataFrame.
frame.foreachPartition { rows =>
val conn = cf()
val cm = new CopyManager(conn.asInstanceOf[BaseConnection])
"""COPY my_schema._mytable FROM STDIN WITH (NULL 'null', FORMAT CSV, DELIMITER E'\t')""", // adjust COPY settings as you desire, options from
rowsToInputStream(rows, "\t"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.