Skip to content

Instantly share code, notes, and snippets.

@gordthompson
Last active April 12, 2024 20:09
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save gordthompson/ae7a1528fde1c00c03fdbb5c53c8f90f to your computer and use it in GitHub Desktop.
Save gordthompson/ae7a1528fde1c00c03fdbb5c53c8f90f to your computer and use it in GitHub Desktop.
Build a PostgreSQL INSERT … ON CONFLICT statement and upsert a DataFrame
# version 1.2 - 2022-10-04
import pandas as pd
import sqlalchemy as sa
def df_upsert(data_frame, table_name, engine, schema=None, match_columns=None):
"""
Perform an "upsert" on a PostgreSQL table from a DataFrame.
Constructs an INSERT … ON CONFLICT statement, uploads the DataFrame to a
temporary table, and then executes the INSERT.
Parameters
----------
data_frame : pandas.DataFrame
The DataFrame to be upserted.
table_name : str
The name of the target table.
engine : sqlalchemy.engine.Engine
The SQLAlchemy Engine to use.
schema : str, optional
The name of the schema containing the target table.
match_columns : list of str, optional
A list of the column name(s) on which to match. If omitted, the
primary key columns of the target table will be used.
"""
table_spec = ""
if schema:
table_spec += '"' + schema.replace('"', '""') + '".'
table_spec += '"' + table_name.replace('"', '""') + '"'
df_columns = list(data_frame.columns)
if not match_columns:
insp = sa.inspect(engine)
match_columns = insp.get_pk_constraint(table_name, schema=schema)[
"constrained_columns"
]
columns_to_update = [col for col in df_columns if col not in match_columns]
insert_col_list = ", ".join([f'"{col_name}"' for col_name in df_columns])
stmt = f"INSERT INTO {table_spec} ({insert_col_list})\n"
stmt += f"SELECT {insert_col_list} FROM temp_table\n"
match_col_list = ", ".join([f'"{col}"' for col in match_columns])
stmt += f"ON CONFLICT ({match_col_list}) DO UPDATE SET\n"
stmt += ", ".join(
[f'"{col}" = EXCLUDED."{col}"' for col in columns_to_update]
)
with engine.begin() as conn:
conn.exec_driver_sql("DROP TABLE IF EXISTS temp_table")
conn.exec_driver_sql(
f"CREATE TEMPORARY TABLE temp_table AS SELECT * FROM {table_spec} WHERE false"
)
data_frame.to_sql("temp_table", conn, if_exists="append", index=False)
conn.exec_driver_sql(stmt)
if __name__ == "__main__":
# Usage example adapted from
# https://stackoverflow.com/a/62379384/2144390
engine = sa.create_engine("postgresql://scott:tiger@192.168.0.199/test")
# create example environment
with engine.begin() as conn:
conn.exec_driver_sql("DROP TABLE IF EXISTS main_table")
conn.exec_driver_sql(
"CREATE TABLE main_table (id int primary key, txt varchar(50), status varchar(50))"
)
conn.exec_driver_sql(
"INSERT INTO main_table (id, txt, status) VALUES (1, 'row 1 old text', 'original')"
)
# [(1, 'row 1 old text', 'original')]
# DataFrame to upsert
df = pd.DataFrame(
[(2, "new row 2 text", "upserted"), (1, "row 1 new text", "upserted")],
columns=["id", "txt", "status"],
)
df_upsert(df, "main_table", engine)
"""The INSERT statement generated for this example:
INSERT INTO "main_table" ("id", "txt", "status")
SELECT "id", "txt", "status" FROM temp_table
ON CONFLICT ("id") DO UPDATE SET
"txt" = EXCLUDED."txt", "status" = EXCLUDED."status"
"""
# check results
with engine.begin() as conn:
print(
conn.exec_driver_sql("SELECT * FROM main_table").all()
)
# [(2, 'new row 2 text', 'upserted'), (1, 'row 1 new text', 'upserted')]
@jaketclarke
Copy link

This was incredibly helpful to me, thank you!

I added this between row 48 & 49 to make life a little easier:

    conn.exec_driver_sql(
        f"DROP TABLE IF EXISTS temp_table"
    )

@gordthompson
Copy link
Author

Thanks for the suggestion, @jaketclarke . Glad you found this helpful!

@Entspannter
Copy link

Thank you for putting this online!

@jackiep00
Copy link

jackiep00 commented Jan 24, 2023

Thanks for publishing this, @gordthompson !

I was running a bunch of concurrent upserts that would conflict with one another due to the single temp table name, so I created a version that adds a random string to the end of the temp table name:
https://gist.github.com/jackiep00/50eced6d1b63ac37e6841e1307007ef8

I'm not sure how to do a PR with gists, or else I woulda done that...

@droy-proxsys
Copy link

Snagged a copy from @jackiep00 as it suits my needs a bit better for volume sake however, @gordthompson Thank you! Both of you!!! I was thinking how to do this but you hero's just made my life much easier! Thank you!

@Furiere
Copy link

Furiere commented Oct 25, 2023

Thank you so much!

@ALubrano77
Copy link

worked so well
Thank you !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment