Last active
June 21, 2024 11:39
-
-
Save gordthompson/be1799bd68a12be58c880bb9c92158bc to your computer and use it in GitHub Desktop.
Build a T-SQL MERGE statement and upsert a DataFrame
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2024 Gordon D. Thompson, gord@gordthompson.com | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# version 1.7 - 2024-06-21 | |
import uuid | |
import pandas as pd | |
import sqlalchemy as sa | |
def df_upsert(data_frame, table_name, engine, schema=None, match_columns=None, | |
chunksize=None, dtype=None, skip_inserts=False, skip_updates=False): | |
""" | |
Perform an "upsert" on a SQL Server table from a DataFrame. | |
Constructs a T-SQL MERGE statement, uploads the DataFrame to a | |
temporary table, and then executes the MERGE. | |
Parameters | |
---------- | |
data_frame : pandas.DataFrame | |
The DataFrame to be upserted. | |
table_name : str | |
The name of the target table. | |
engine : sqlalchemy.engine.Engine | |
The SQLAlchemy Engine to use. | |
schema : str, optional | |
The name of the schema containing the target table. | |
match_columns : list of str, optional | |
A list of the column name(s) on which to match. If omitted, the | |
primary key columns of the target table will be used. | |
chunksize: int, optional | |
Specify chunk size for .to_sql(). See the pandas docs for details. | |
dtype : dict, optional | |
Specify column types for .to_sql(). See the pandas docs for details. | |
skip_inserts : bool, optional | |
Skip inserting unmatched rows. (Default: False) | |
skip_updates : bool, optional | |
Skip updating matched rows. (Default: False) | |
""" | |
if skip_inserts and skip_updates: | |
raise ValueError("skip_inserts and skip_updates cannot both be True") | |
temp_table_name = "##" + str(uuid.uuid4()).replace("-", "_") | |
table_spec = "" | |
if schema: | |
table_spec += "[" + schema.replace("]", "]]") + "]." | |
table_spec += "[" + table_name.replace("]", "]]") + "]" | |
df_columns = list(data_frame.columns) | |
if not match_columns: | |
insp = sa.inspect(engine) | |
match_columns = insp.get_pk_constraint(table_name, schema=schema)[ | |
"constrained_columns" | |
] | |
columns_to_update = [col for col in df_columns if col not in match_columns] | |
stmt = f"MERGE {table_spec} WITH (HOLDLOCK) AS main\n" | |
stmt += f"USING (SELECT {', '.join([f'[{col}]' for col in df_columns])} FROM {temp_table_name}) AS temp\n" | |
join_condition = " AND ".join( | |
[f"main.[{col}] = temp.[{col}]" for col in match_columns] | |
) | |
stmt += f"ON ({join_condition})" | |
if not skip_updates: | |
stmt += "\nWHEN MATCHED THEN\n" | |
update_list = ", ".join( | |
[f"[{col}] = temp.[{col}]" for col in columns_to_update] | |
) | |
stmt += f" UPDATE SET {update_list}" | |
if not skip_inserts: | |
stmt += "\nWHEN NOT MATCHED THEN\n" | |
insert_cols_str = ", ".join([f"[{col}]" for col in df_columns]) | |
insert_vals_str = ", ".join([f"temp.[{col}]" for col in df_columns]) | |
stmt += f" INSERT ({insert_cols_str}) VALUES ({insert_vals_str})" | |
stmt += ";" | |
with engine.begin() as conn: | |
data_frame.to_sql(temp_table_name, conn, index=False, chunksize=chunksize, dtype=dtype) | |
conn.exec_driver_sql(stmt) | |
conn.exec_driver_sql(f"DROP TABLE IF EXISTS {temp_table_name}") | |
if __name__ == "__main__": | |
# Usage example adapted from | |
# https://stackoverflow.com/a/62388768/2144390 | |
engine = sa.create_engine( | |
"mssql+pyodbc://scott:tiger^5HHH@192.168.0.199/test" | |
"?driver=ODBC+Driver+17+for+SQL+Server", | |
fast_executemany=True, | |
) | |
# create example environment | |
with engine.begin() as conn: | |
conn.exec_driver_sql("DROP TABLE IF EXISTS main_table") | |
conn.exec_driver_sql( | |
"CREATE TABLE main_table (id int primary key, txt nvarchar(50), status nvarchar(50))" | |
) | |
conn.exec_driver_sql( | |
"INSERT INTO main_table (id, txt, status) VALUES (1, N'row 1 old text', N'original')" | |
) | |
# [(1, 'row 1 old text', 'original')] | |
# DataFrame to upsert | |
df = pd.DataFrame( | |
[(2, "new row 2 text", "upserted"), (1, "row 1 new text", "upserted")], | |
columns=["id", "txt", "status"], | |
) | |
df_upsert(df, "main_table", engine) | |
"""The MERGE statement generated for this example: | |
MERGE [main_table] WITH (HOLDLOCK) AS main | |
USING (SELECT [id], [txt], [status] FROM ##955db388_01c5_4e79_a5d1_3e8cfadf400b) AS temp | |
ON (main.[id] = temp.[id]) | |
WHEN MATCHED THEN | |
UPDATE SET [txt] = temp.[txt], [status] = temp.[status] | |
WHEN NOT MATCHED THEN | |
INSERT ([id], [txt], [status]) VALUES (temp.[id], temp.[txt], temp.[status]); | |
""" | |
# check results | |
with engine.begin() as conn: | |
print( | |
conn.exec_driver_sql("SELECT * FROM main_table").all() | |
) | |
# [(1, 'row 1 new text', 'upserted'), (2, 'new row 2 text', 'upserted')] |
I've also got a scenario where I am presumably running out of memory when trying to merge in 500k+ records:
For this particular data frame there are some columns which are completely NULL. From SQL's point of view its not necessary to supply NULL as a value to merge in, would it be possible to add a parameter to exclude columns where the entire column is NULL? This would reduce the number of columns needed for comparison in the MERGE and improve performance
@pseudobacon - I added chunksize
to try and help with memory consumption.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Huzzah it works! Thank you very much