Skip to content

Instantly share code, notes, and snippets.

@dfdeshom
Last active December 15, 2017 20:49
Show Gist options
  • Save dfdeshom/89497f7dcd81ad05464b19545a0094e2 to your computer and use it in GitHub Desktop.
Save dfdeshom/89497f7dcd81ad05464b19545a0094e2 to your computer and use it in GitHub Desktop.
from psycopg2.extras import execute_batch
import psycopg2
def upsert_pg(self, rdd,table, id_field='id'):
"""Upsert rdd into PG given a field constraint."""
def _inner(partition):
item = list(partition)[0]
pg_conn = psycopg2.connect(dbname='db')
columns = item.keys()
statement = generate_pg_upsert_statement(table,
columns,
id_field)
with pg_conn:
with pg_conn.cursor() as cur:
execute_batch(cur, statement, items)
yield True
rdd.mapPartitions(_inner).collect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment