Last active
January 5, 2022 06:08
-
-
Save Nikolay-Lysenko/0887f4b59dc4914cec9b236c317d06e3 to your computer and use it in GitHub Desktop.
Upsert (a hybrid of insert and update) from pandas.DataFrame to PostgreSQL database
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import sleep | |
from io import StringIO | |
import psycopg2 | |
def upsert_df_into_postgres(df, target_table, primary_keys, conn_string, | |
n_trials=5, quoting=None, null_repr=None): | |
""" | |
Uploads data from `df` to `target_table` | |
which is a relation in the PostgreSQL | |
database specified in `conn_string`. | |
Data are loaded in a bulk fashion and | |
before that (in the same transaction) | |
rows with overlapping values of columns | |
from `primary_keys` are deleted. | |
To be more detailed, each element of list | |
`primary_keys` is a tuple of the form | |
(column_name, column_sql_type). | |
Alternatively, it is possible to implement | |
a version of the function where | |
`primary_keys` has type `OrderedDict`. | |
Setting `quoting` to `csv.QUOTE_NONE` | |
allows inserting JSON fields. | |
Passing `null_repr` results in | |
customized handling of missing values. | |
@type df: pandas.DataFrame | |
@type target_table: str | |
@type primary_keys: list(tuple(str)) | |
@type conn_string: str | |
@type n_trials: int | |
@type quoting: csv.flag | |
@type null_repr: any | |
@rtype: NoneType | |
""" | |
to_be_deleted = df[[x[0] for x in primary_keys]].drop_duplicates() | |
if len(to_be_deleted.index) < len(df.index): | |
raise ValueError("Primary key constraint is violated in passed `df`.") | |
del_stream = StringIO() | |
del_stream.write(to_be_deleted.to_csv(sep='\t', encoding='utf8', | |
index=False, header=False)) | |
del_stream.seek(0) | |
stream = StringIO() | |
stream.write(df.to_csv(sep='\t', encoding='utf8', | |
index=False, header=False, quoting=quoting)) | |
stream.seek(0) | |
for i in range(n_trials): | |
conn = None | |
curs = None | |
try: | |
conn = psycopg2.connect(conn_string) | |
curs = conn.cursor() | |
curs.execute( | |
''' | |
CREATE TEMP TABLE to_be_deleted | |
( | |
{} | |
) | |
'''.format(',\n'.join( | |
['{} {}'.format(*x) for x in primary_keys])) | |
) | |
# No `null_repr`, because keys must not be NULL. | |
curs.copy_from(del_stream, 'to_be_deleted') | |
curs.execute( | |
''' | |
DELETE FROM | |
{0} | |
WHERE | |
({1}) IN (SELECT {1} FROM to_be_deleted) | |
'''.format(target_table, | |
', '.join([x[0] for x in primary_keys])) | |
) | |
if null_repr is not None: | |
curs.copy_from(stream, target_table, null=null_repr) | |
else: | |
curs.copy_from(stream, target_table) | |
conn.commit() | |
except Exception as e: | |
if i == n_trials - 1: | |
# Also e-mail or message can be sent here. | |
raise e | |
else: | |
print(e) | |
sleep(60 * (i + 1)) | |
else: | |
break | |
finally: | |
if curs is not None: | |
curs.close() | |
if conn is not None: | |
conn.close() | |
# ----------------- | |
# Example of usage: | |
# import pandas as pd | |
# conn_string = 'postgres://username:password@host:port/db_name' | |
# df = pd.DataFrame([[1, 'a', 2.1], | |
# [2, 'b', 4.3], | |
# [3, 'c', 7.4]], columns=['pk_one', 'pk_two', 'value']) | |
# upsert_df_into_postgres(df, 'tmp.example_table', | |
# [('pk_one', 'integer'), | |
# ('pk_two', 'varchar(16)')], | |
# conn_string) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
head over to here for a new take on this kind of critical task:https://github.com/ThibTrip/pangres/wiki/Upsert