-
-
Save dfdeshom/89497f7dcd81ad05464b19545a0094e2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from psycopg2.extras import execute_batch | |
import psycopg2 | |
def upsert_pg(self, rdd,table, id_field='id'): | |
"""Upsert rdd into PG given a field constraint.""" | |
def _inner(partition): | |
item = list(partition)[0] | |
pg_conn = psycopg2.connect(dbname='db') | |
columns = item.keys() | |
statement = generate_pg_upsert_statement(table, | |
columns, | |
id_field) | |
with pg_conn: | |
with pg_conn.cursor() as cur: | |
execute_batch(cur, statement, items) | |
yield True | |
rdd.mapPartitions(_inner).collect() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment