Skip to content

Instantly share code, notes, and snippets.

@Nikolay-Lysenko
Last active January 5, 2022 06:08
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save Nikolay-Lysenko/0887f4b59dc4914cec9b236c317d06e3 to your computer and use it in GitHub Desktop.
Save Nikolay-Lysenko/0887f4b59dc4914cec9b236c317d06e3 to your computer and use it in GitHub Desktop.
Upsert (a hybrid of insert and update) from pandas.DataFrame to PostgreSQL database
from time import sleep
from io import StringIO
import psycopg2
def upsert_df_into_postgres(df, target_table, primary_keys, conn_string,
n_trials=5, quoting=None, null_repr=None):
"""
Uploads data from `df` to `target_table`
which is a relation in the PostgreSQL
database specified in `conn_string`.
Data are loaded in a bulk fashion and
before that (in the same transaction)
rows with overlapping values of columns
from `primary_keys` are deleted.
To be more detailed, each element of list
`primary_keys` is a tuple of the form
(column_name, column_sql_type).
Alternatively, it is possible to implement
a version of the function where
`primary_keys` has type `OrderedDict`.
Setting `quoting` to `csv.QUOTE_NONE`
allows inserting JSON fields.
Passing `null_repr` results in
customized handling of missing values.
@type df: pandas.DataFrame
@type target_table: str
@type primary_keys: list(tuple(str))
@type conn_string: str
@type n_trials: int
@type quoting: csv.flag
@type null_repr: any
@rtype: NoneType
"""
to_be_deleted = df[[x[0] for x in primary_keys]].drop_duplicates()
if len(to_be_deleted.index) < len(df.index):
raise ValueError("Primary key constraint is violated in passed `df`.")
del_stream = StringIO()
del_stream.write(to_be_deleted.to_csv(sep='\t', encoding='utf8',
index=False, header=False))
del_stream.seek(0)
stream = StringIO()
stream.write(df.to_csv(sep='\t', encoding='utf8',
index=False, header=False, quoting=quoting))
stream.seek(0)
for i in range(n_trials):
conn = None
curs = None
try:
conn = psycopg2.connect(conn_string)
curs = conn.cursor()
curs.execute(
'''
CREATE TEMP TABLE to_be_deleted
(
{}
)
'''.format(',\n'.join(
['{} {}'.format(*x) for x in primary_keys]))
)
# No `null_repr`, because keys must not be NULL.
curs.copy_from(del_stream, 'to_be_deleted')
curs.execute(
'''
DELETE FROM
{0}
WHERE
({1}) IN (SELECT {1} FROM to_be_deleted)
'''.format(target_table,
', '.join([x[0] for x in primary_keys]))
)
if null_repr is not None:
curs.copy_from(stream, target_table, null=null_repr)
else:
curs.copy_from(stream, target_table)
conn.commit()
except Exception as e:
if i == n_trials - 1:
# Also e-mail or message can be sent here.
raise e
else:
print(e)
sleep(60 * (i + 1))
else:
break
finally:
if curs is not None:
curs.close()
if conn is not None:
conn.close()
# -----------------
# Example of usage:
# import pandas as pd
# conn_string = 'postgres://username:password@host:port/db_name'
# df = pd.DataFrame([[1, 'a', 2.1],
# [2, 'b', 4.3],
# [3, 'c', 7.4]], columns=['pk_one', 'pk_two', 'value'])
# upsert_df_into_postgres(df, 'tmp.example_table',
# [('pk_one', 'integer'),
# ('pk_two', 'varchar(16)')],
# conn_string)
@nate-benton90
Copy link

head over to here for a new take on this kind of critical task:https://github.com/ThibTrip/pangres/wiki/Upsert

@JuliusJacobitz
Copy link

Maybe also have a look at this: https://gist.github.com/luke/5697511

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment