Skip to content

Instantly share code, notes, and snippets.

@gordthompson
Last active June 21, 2024 11:39
Show Gist options
  • Save gordthompson/be1799bd68a12be58c880bb9c92158bc to your computer and use it in GitHub Desktop.
Save gordthompson/be1799bd68a12be58c880bb9c92158bc to your computer and use it in GitHub Desktop.
Build a T-SQL MERGE statement and upsert a DataFrame
# Copyright 2024 Gordon D. Thompson, gord@gordthompson.com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# version 1.7 - 2024-06-21
import uuid
import pandas as pd
import sqlalchemy as sa
def df_upsert(data_frame, table_name, engine, schema=None, match_columns=None,
chunksize=None, dtype=None, skip_inserts=False, skip_updates=False):
"""
Perform an "upsert" on a SQL Server table from a DataFrame.
Constructs a T-SQL MERGE statement, uploads the DataFrame to a
temporary table, and then executes the MERGE.
Parameters
----------
data_frame : pandas.DataFrame
The DataFrame to be upserted.
table_name : str
The name of the target table.
engine : sqlalchemy.engine.Engine
The SQLAlchemy Engine to use.
schema : str, optional
The name of the schema containing the target table.
match_columns : list of str, optional
A list of the column name(s) on which to match. If omitted, the
primary key columns of the target table will be used.
chunksize: int, optional
Specify chunk size for .to_sql(). See the pandas docs for details.
dtype : dict, optional
Specify column types for .to_sql(). See the pandas docs for details.
skip_inserts : bool, optional
Skip inserting unmatched rows. (Default: False)
skip_updates : bool, optional
Skip updating matched rows. (Default: False)
"""
if skip_inserts and skip_updates:
raise ValueError("skip_inserts and skip_updates cannot both be True")
temp_table_name = "##" + str(uuid.uuid4()).replace("-", "_")
table_spec = ""
if schema:
table_spec += "[" + schema.replace("]", "]]") + "]."
table_spec += "[" + table_name.replace("]", "]]") + "]"
df_columns = list(data_frame.columns)
if not match_columns:
insp = sa.inspect(engine)
match_columns = insp.get_pk_constraint(table_name, schema=schema)[
"constrained_columns"
]
columns_to_update = [col for col in df_columns if col not in match_columns]
stmt = f"MERGE {table_spec} WITH (HOLDLOCK) AS main\n"
stmt += f"USING (SELECT {', '.join([f'[{col}]' for col in df_columns])} FROM {temp_table_name}) AS temp\n"
join_condition = " AND ".join(
[f"main.[{col}] = temp.[{col}]" for col in match_columns]
)
stmt += f"ON ({join_condition})"
if not skip_updates:
stmt += "\nWHEN MATCHED THEN\n"
update_list = ", ".join(
[f"[{col}] = temp.[{col}]" for col in columns_to_update]
)
stmt += f" UPDATE SET {update_list}"
if not skip_inserts:
stmt += "\nWHEN NOT MATCHED THEN\n"
insert_cols_str = ", ".join([f"[{col}]" for col in df_columns])
insert_vals_str = ", ".join([f"temp.[{col}]" for col in df_columns])
stmt += f" INSERT ({insert_cols_str}) VALUES ({insert_vals_str})"
stmt += ";"
with engine.begin() as conn:
data_frame.to_sql(temp_table_name, conn, index=False, chunksize=chunksize, dtype=dtype)
conn.exec_driver_sql(stmt)
conn.exec_driver_sql(f"DROP TABLE IF EXISTS {temp_table_name}")
if __name__ == "__main__":
# Usage example adapted from
# https://stackoverflow.com/a/62388768/2144390
engine = sa.create_engine(
"mssql+pyodbc://scott:tiger^5HHH@192.168.0.199/test"
"?driver=ODBC+Driver+17+for+SQL+Server",
fast_executemany=True,
)
# create example environment
with engine.begin() as conn:
conn.exec_driver_sql("DROP TABLE IF EXISTS main_table")
conn.exec_driver_sql(
"CREATE TABLE main_table (id int primary key, txt nvarchar(50), status nvarchar(50))"
)
conn.exec_driver_sql(
"INSERT INTO main_table (id, txt, status) VALUES (1, N'row 1 old text', N'original')"
)
# [(1, 'row 1 old text', 'original')]
# DataFrame to upsert
df = pd.DataFrame(
[(2, "new row 2 text", "upserted"), (1, "row 1 new text", "upserted")],
columns=["id", "txt", "status"],
)
df_upsert(df, "main_table", engine)
"""The MERGE statement generated for this example:
MERGE [main_table] WITH (HOLDLOCK) AS main
USING (SELECT [id], [txt], [status] FROM ##955db388_01c5_4e79_a5d1_3e8cfadf400b) AS temp
ON (main.[id] = temp.[id])
WHEN MATCHED THEN
UPDATE SET [txt] = temp.[txt], [status] = temp.[status]
WHEN NOT MATCHED THEN
INSERT ([id], [txt], [status]) VALUES (temp.[id], temp.[txt], temp.[status]);
"""
# check results
with engine.begin() as conn:
print(
conn.exec_driver_sql("SELECT * FROM main_table").all()
)
# [(1, 'row 1 new text', 'upserted'), (2, 'new row 2 text', 'upserted')]
@gordthompson
Copy link
Author

gordthompson commented Jun 19, 2024

Try this: Reinstate fast_executemany=True and hack your copy of mssql_df_upsert.py, changing this …

    with engine.begin() as conn:
        data_frame.to_sql("#temp_table", conn, index=False, dtype=dtype)
        conn.exec_driver_sql(stmt)
        conn.exec_driver_sql("DROP TABLE IF EXISTS #temp_table")

… to this …

    with engine.begin() as conn:
        conn.exec_driver_sql(f"SELECT * INTO #temp_table FROM {table_spec} WHERE 1=0")
        crsr = conn.connection.dbapi_connection.cursor()
        insert_stmt = f"INSERT INTO #temp_table VALUES ({', '.join('?' * len(df_columns))})"
        crsr.executemany(insert_stmt, list(data_frame.itertuples(index=False)))
        conn.exec_driver_sql(stmt)
        conn.exec_driver_sql("DROP TABLE IF EXISTS #temp_table")

(The code assumes that the DataFrame column order exactly matches the table's.)

@pseudobacon
Copy link

image

Fixes that error, but then we revert back to the varbinary error

@gordthompson
Copy link
Author

Have you already done some mods to the mssql_df_upsert.py file? Your line 80 is my line 76. (The [lack of] indentation also looks strange, but maybe they're just trimming leading whitespace.)

@pseudobacon
Copy link

pseudobacon commented Jun 19, 2024

Yeah I made a change to the dataframe definition:
image

And my sourceconnection is defined as a create_engine("mssql+pyodbc://sqldriver")

My upsert is a single line

@gordthompson
Copy link
Author

gordthompson commented Jun 20, 2024

@pseudobacon - Revert the above change (with block) to use .to_sql() again, then change the name of the temporary table from #temp_table to ##temp_table (in 4 places).

… or use the updated code I just posted.

@pseudobacon
Copy link

Huzzah it works! Thank you very much

@pseudobacon
Copy link

I've also got a scenario where I am presumably running out of memory when trying to merge in 500k+ records:

image

For this particular data frame there are some columns which are completely NULL. From SQL's point of view its not necessary to supply NULL as a value to merge in, would it be possible to add a parameter to exclude columns where the entire column is NULL? This would reduce the number of columns needed for comparison in the MERGE and improve performance

@gordthompson
Copy link
Author

@pseudobacon - I added chunksize to try and help with memory consumption.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment