Skip to content

Instantly share code, notes, and snippets.

@gordthompson
Last active June 21, 2024 11:39
Show Gist options
  • Save gordthompson/be1799bd68a12be58c880bb9c92158bc to your computer and use it in GitHub Desktop.
Save gordthompson/be1799bd68a12be58c880bb9c92158bc to your computer and use it in GitHub Desktop.
Build a T-SQL MERGE statement and upsert a DataFrame
# Copyright 2024 Gordon D. Thompson, gord@gordthompson.com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# version 1.7 - 2024-06-21
import uuid
import pandas as pd
import sqlalchemy as sa
def df_upsert(data_frame, table_name, engine, schema=None, match_columns=None,
chunksize=None, dtype=None, skip_inserts=False, skip_updates=False):
"""
Perform an "upsert" on a SQL Server table from a DataFrame.
Constructs a T-SQL MERGE statement, uploads the DataFrame to a
temporary table, and then executes the MERGE.
Parameters
----------
data_frame : pandas.DataFrame
The DataFrame to be upserted.
table_name : str
The name of the target table.
engine : sqlalchemy.engine.Engine
The SQLAlchemy Engine to use.
schema : str, optional
The name of the schema containing the target table.
match_columns : list of str, optional
A list of the column name(s) on which to match. If omitted, the
primary key columns of the target table will be used.
chunksize: int, optional
Specify chunk size for .to_sql(). See the pandas docs for details.
dtype : dict, optional
Specify column types for .to_sql(). See the pandas docs for details.
skip_inserts : bool, optional
Skip inserting unmatched rows. (Default: False)
skip_updates : bool, optional
Skip updating matched rows. (Default: False)
"""
if skip_inserts and skip_updates:
raise ValueError("skip_inserts and skip_updates cannot both be True")
temp_table_name = "##" + str(uuid.uuid4()).replace("-", "_")
table_spec = ""
if schema:
table_spec += "[" + schema.replace("]", "]]") + "]."
table_spec += "[" + table_name.replace("]", "]]") + "]"
df_columns = list(data_frame.columns)
if not match_columns:
insp = sa.inspect(engine)
match_columns = insp.get_pk_constraint(table_name, schema=schema)[
"constrained_columns"
]
columns_to_update = [col for col in df_columns if col not in match_columns]
stmt = f"MERGE {table_spec} WITH (HOLDLOCK) AS main\n"
stmt += f"USING (SELECT {', '.join([f'[{col}]' for col in df_columns])} FROM {temp_table_name}) AS temp\n"
join_condition = " AND ".join(
[f"main.[{col}] = temp.[{col}]" for col in match_columns]
)
stmt += f"ON ({join_condition})"
if not skip_updates:
stmt += "\nWHEN MATCHED THEN\n"
update_list = ", ".join(
[f"[{col}] = temp.[{col}]" for col in columns_to_update]
)
stmt += f" UPDATE SET {update_list}"
if not skip_inserts:
stmt += "\nWHEN NOT MATCHED THEN\n"
insert_cols_str = ", ".join([f"[{col}]" for col in df_columns])
insert_vals_str = ", ".join([f"temp.[{col}]" for col in df_columns])
stmt += f" INSERT ({insert_cols_str}) VALUES ({insert_vals_str})"
stmt += ";"
with engine.begin() as conn:
data_frame.to_sql(temp_table_name, conn, index=False, chunksize=chunksize, dtype=dtype)
conn.exec_driver_sql(stmt)
conn.exec_driver_sql(f"DROP TABLE IF EXISTS {temp_table_name}")
if __name__ == "__main__":
# Usage example adapted from
# https://stackoverflow.com/a/62388768/2144390
engine = sa.create_engine(
"mssql+pyodbc://scott:tiger^5HHH@192.168.0.199/test"
"?driver=ODBC+Driver+17+for+SQL+Server",
fast_executemany=True,
)
# create example environment
with engine.begin() as conn:
conn.exec_driver_sql("DROP TABLE IF EXISTS main_table")
conn.exec_driver_sql(
"CREATE TABLE main_table (id int primary key, txt nvarchar(50), status nvarchar(50))"
)
conn.exec_driver_sql(
"INSERT INTO main_table (id, txt, status) VALUES (1, N'row 1 old text', N'original')"
)
# [(1, 'row 1 old text', 'original')]
# DataFrame to upsert
df = pd.DataFrame(
[(2, "new row 2 text", "upserted"), (1, "row 1 new text", "upserted")],
columns=["id", "txt", "status"],
)
df_upsert(df, "main_table", engine)
"""The MERGE statement generated for this example:
MERGE [main_table] WITH (HOLDLOCK) AS main
USING (SELECT [id], [txt], [status] FROM ##955db388_01c5_4e79_a5d1_3e8cfadf400b) AS temp
ON (main.[id] = temp.[id])
WHEN MATCHED THEN
UPDATE SET [txt] = temp.[txt], [status] = temp.[status]
WHEN NOT MATCHED THEN
INSERT ([id], [txt], [status]) VALUES (temp.[id], temp.[txt], temp.[status]);
"""
# check results
with engine.begin() as conn:
print(
conn.exec_driver_sql("SELECT * FROM main_table").all()
)
# [(1, 'row 1 new text', 'upserted'), (2, 'new row 2 text', 'upserted')]
@npnigro
Copy link

npnigro commented Oct 31, 2022

This is wonderful! I got tripped up because I didn't notice match_columns was a list (python confusingly split up my single column name by letter). Two other flags: I wasn't able to use a datetime2 column. Was getting an error about a timestamp column:

"Cannot insert an explicit value into a timestamp column. Use INSERT with a column list to exclude the timestamp column, or insert a DEFAULT into the timestamp column."

The column wasn't a timestamp column. My solution was to format the datetime field in python to a date string.

The other bit was making sure to drop the #temp_table upon completion so subsequent calls to the function will work. Again, thank you!

@gordthompson
Copy link
Author

Hi @npnigro . Thanks for the feedback!

I have added a DROP TABLE for the temporary table as suggested.

As for the datetime2 issue, this example works fine. Can you provide repro code? (Your DataFrame may be slightly different from mine.)

@npnigro
Copy link

npnigro commented Nov 1, 2022

Thank you for the quick reply! This is how I store the date/time in the dataframe (I hid the other fields in the dataframe since they're not relevant). When I pass that field to the function, it gives me the error I shared above.

def store_post_stats(local_post):  
    # Get today's date only
    #local_date = datetime.strftime(datetime.now(tz.gettz('America/New_York')), "%Y-%m-%d")
 
    list = {
        'post_snapshot_date': datetime.now(tz.gettz('America/New_York')),
    }
    mapPostStats.append(list)
    return

My solution is to pass this instead:
local_date = datetime.strftime(datetime.now(tz.gettz('America/New_York')), "%Y-%m-%d")

@gordthompson
Copy link
Author

gordthompson commented Nov 1, 2022

@npnigro - Okay, thanks. It appears that pandas creates a TIMESTAMP column in the temp table if the datetime value is timezone-aware. Another workaround would be to convert it to a naive datetime value:

datetime.now(tz.gettz('America/New_York')).replace(tzinfo=None)

That keeps the date and time values the same, but drops the offset (which can't be stored in a datetime/datetime2 column anyway).

@npnigro
Copy link

npnigro commented Nov 1, 2022

Really appreciate your help here. I'm new to Python so still getting my arms around it.

@TheFoon
Copy link

TheFoon commented Mar 28, 2023

This is exactly what I was looking for! Any info on licensing?

@gordthompson
Copy link
Author

@TheFoon - I have added license text.

@pseudobacon
Copy link

Really great script thanks. I'm trying to merge into a table with some varbinary columns and I am getting this error
implicit conversion

I've tried wrapping the columns with a CONVERT in the SELECT query but that doesn't seem to help. Here is the table definition

CREATE TABLE [dbo].[LEAS](
	[BLDGID] [nchar](6) NOT NULL,
	[LEASID] [nchar](6) NOT NULL,
	[SUITID] [nchar](5) NOT NULL,
	[OCCPNAME] [nchar](75) NULL,
	[DBA] [nchar](75) NULL,
	[CONTNAME] [nchar](30) NULL,
	[PHONENO1] [nchar](15) NULL,
	[PHONENO2] [nchar](15) NULL,
	[NAME] [nchar](50) NULL,
	[ADDRESS] [nchar](50) NULL,
	[ADDRESS2] [nchar](50) NULL,
	[CITY] [nchar](35) NULL,
	[STATE] [nchar](3) NULL,
	[ZIPCODE] [nchar](9) NULL,
	[ATTENT] [nchar](30) NULL,
	[TENTID] [nchar](5) NULL,
	[TTYPID] [nchar](5) NULL,
	[OCCPSTAT] [nchar](1) NOT NULL,
	[EXECDATE] [datetime] NULL,
	[RENTSTRT] [datetime] NULL,
	[OCCUPNCY] [datetime] NULL,
	[BEGINDATE] [datetime] NULL,
	[EXPIR] [datetime] NULL,
	[VACATE] [datetime] NULL,
	[CANCDATE] [datetime] NULL,
	[PRTSTAT] [nchar](1) NOT NULL,
	[PERMITCR] [nchar](1) NOT NULL,
	[LEASSTOP] [nchar](1) NOT NULL,
	[DAYDUE] [smallint] NULL,
	[LATECALC] [nchar](1) NULL,
	[LATEFLAT] [money] NULL,
	[LATERATE] [float] NULL,
	[INTRATE] [float] NULL,
	[MAXLATE] [nchar](1) NOT NULL,
	[NUMLATE] [smallint] NULL,
	[NCDEP] [money] NULL,
	[MGMTRATE] [float] NULL,
	[DELQOPT] [nchar](1) NULL,
	[DELQDAY] [smallint] NULL,
	[INTSTART] [datetime] NULL,
	[LASTINT] [datetime] NULL,
	[LASTDATE] [datetime] NULL,
	[USERID] [nchar](20) NULL,
	[ADDRID] [nchar](6) NULL,
	[EXCL5YR] [nchar](1) NOT NULL,
	[BANKID] [nchar](6) NULL,
	[VENDID] [nchar](6) NULL,
	[ALLOWAMT] [money] NULL,
	[PROFILE] [nchar](2) NULL,
	[COUNTY] [nchar](20) NULL,
	[STORECAT] [nchar](1) NULL,
	[TENTCAT] [nchar](5) NULL,
	[STYPID] [nchar](5) NULL,
	[YEAREND] [nchar](6) NULL,
	[PRVYREND] [nchar](6) NULL,
	[PRVY2END] [nchar](6) NULL,
	[SALESTRT] [datetime] NULL,
	[SALEEND] [datetime] NULL,
	[REPFREQ] [nchar](1) NOT NULL,
	[REQRPTY] [nchar](3) NULL,
	[REQADJ] [nchar](1) NOT NULL,
	[PCTGBASE] [nchar](1) NOT NULL,
	[ESTLEVEL] [nchar](3) NULL,
	[ESTMETH] [nchar](3) NULL,
	[LASTEST] [nchar](3) NULL,
	[NATURAL] [nchar](1) NOT NULL,
	[NINCCAT1] [nchar](5) NULL,
	[NINCCAT2] [nchar](5) NULL,
	[NINCCAT3] [nchar](5) NULL,
	[RPTYPRI1] [nchar](3) NULL,
	[RPTYPRI2] [nchar](3) NULL,
	[RPTYPRI3] [nchar](3) NULL,
	[RPTYPRI4] [nchar](3) NULL,
	[RPTYPRI5] [nchar](3) NULL,
	[NUMCOPY] [smallint] NULL,
	[SICCODE] [nchar](6) NULL,
	[CHAINID] [nchar](6) NULL,
	[MOCCPID] [nchar](8) NULL,
	[GENERATION] [smallint] NOT NULL,
	[GENCODE] [nchar](3) NULL,
	[ADDLSPACE] [nchar](1) NOT NULL,
	[STOPBILLDATE] [datetime] NULL,
	[LITIGATION] [nchar](1) NULL,
	[RENEWAL] [nchar](1) NULL,
	[PRIMARYCHGS] [nchar](1) NULL,
	[INSEXP] [datetime] NULL,
	[TICOST] [money] NULL,
	[COMMCOST] [money] NULL,
	[DRAWER] [nchar](35) NULL,
	[FININST] [nchar](20) NULL,
	[BRANCH] [nchar](20) NULL,
	[HEADLEAS] [nchar](1) NULL,
	[PRORATE] [nchar](1) NULL,
	[CURRCODE] [nchar](3) NULL,
	[CROSSCURYN] [nchar](1) NULL,
	[LTFEEDAYS] [smallint] NULL,
	[LTFCAMT] [money] NULL,
	[EXPSTOP] [money] NULL,
	[DEPARTMENT] [nchar](5) NULL,
	[ACCOUNTID] [nchar](12) NULL,
	[NAICSCODE] [nchar](8) NULL,
	[ABATRAN] [varbinary](max) NULL,
	[ABABANK] [varbinary](max) NULL,
	[BANKACCTNAME] [nchar](30) NULL,
	[EMAILBILL] [nchar](150) NULL,
	[PREPAYTAXTERM] [smallint] NULL,
	[TAXEXEMPT] [nchar](1) NULL,
	[AUTOEXCEPTION] [nchar](1) NULL,
	[FASBEXCEPTION] [nchar](1) NULL,
	[FASBSTART] [datetime] NULL,
	[FASBSTOP] [datetime] NULL,
	[PERCENTRENT] [nchar](1) NULL,
	[ORIGEXPIR] [datetime] NULL,
	[AFTERFACT] [nchar](1) NULL,
	[COTENANT] [nchar](1) NULL,
	[ACCPHONE] [nchar](14) NULL,
	[ACCFAX] [nchar](14) NULL,
	[COUNTRY] [nchar](30) NULL,
	[ACCTEMAIL] [nchar](125) NULL,
	[CONTTITLE] [nchar](10) NULL,
	[CONTINITIALS] [nchar](10) NULL,
	[ATTENTTITLE] [nchar](10) NULL,
	[ATTENTINITIALS] [nchar](10) NULL,
	[BILLADDR1] [nchar](50) NULL,
	[BILLADDR2] [nchar](50) NULL,
	[BILLADDR3] [nchar](50) NULL,
	[BILLCITY] [nchar](30) NULL,
	[BILLCOUNTY] [nchar](20) NULL,
	[BILLZIPCODE] [nchar](15) NULL,
	[BILLCOUNTRY] [nchar](20) NULL,
	[EMAIL] [nchar](150) NULL,
	[ADDRESS3] [nchar](50) NULL,
	[REGION] [nchar](10) NULL,
	[CNTYCODE] [nchar](5) NULL,
	[VATREGNO] [nchar](20) NULL,
	[DRINCCAT] [nchar](5) NULL,
	[HRCASHTYPE] [nchar](2) NULL,
	[HRDESC] [nchar](30) NULL,
	[HRACCTNUM] [nchar](9) NULL,
	[NOTICEDATE] [datetime] NULL,
	[REVPAT] [nchar](2) NULL,
	[NEXTREV] [datetime] NULL,
	[REMPERIOD] [float] NULL,
	[TIMEESS] [nchar](1) NULL,
	[REVUPWARD] [nchar](1) NULL,
	[COLLECTIONID] [bigint] NULL,
	[CLS_OLDREF] [nchar](20) NULL,
	[CLS_POREF] [nchar](50) NULL,
	[CLS_PAYABLE] [nchar](50) NULL,
	[CLS_ERV] [money] NULL,
	[CLS_NER] [money] NULL,
	[SUBREGION] [nchar](10) NULL,
	[CLS_CONTRACTEDRENT] [numeric](12, 2) NULL,
	[CLS_INVNOTE] [ntext] NULL,
	[DEMISEDESC] [nchar](70) NULL,
	[SHDEMISE] [nchar](25) NULL,
	[TENANCYTYPE] [nchar](10) NULL,
	[GUARNAME] [nchar](50) NULL,
	[TENANCYSTAT] [nchar](2) NULL,
	[SEARCHNAME] [nchar](50) NULL,
	[SUPCHASELET] [nchar](1) NULL,
	[SCCHARGEFR] [datetime] NULL,
	[BREACHTEXT] [ntext] NULL,
	[SCNET] [nchar](1) NULL,
	[IDE_RES] [nchar](1) NULL,
	[LEASLLSTOP] [nchar](1) NULL,
	[MANDATE_NO] [nchar](35) NULL,
	[MANDATE_DATE] [datetime] NULL,
	[SCCHARGETO] [datetime] NULL,
	[IPL_PAYMETHOD] [nchar](12) NULL,
	[IPL_OVERSEAS] [nchar](1) NULL,
	[EFTSCHEMEID] [nvarchar](50) NULL,
	[BANKNAME] [nchar](80) NULL,
	[BANKCURRCODE] [nchar](3) NULL,
	[IIT_PARTIVA] [nchar](30) NULL,
	[IIT_CODEFIS] [nchar](50) NULL,
	[IIT_DD] [nchar](1) NULL,
	[PAYMETH] [nchar](10) NULL,
	[LRTANND] [datetime] NULL,
	[LRTAPP] [nchar](1) NULL,
	[LRTPERC] [numeric](18, 2) NULL,
	[IIT_CODERID] [nchar](16) NULL,
	[IIT_STAMP] [nchar](1) NULL,
	[IFR_RTBASE1] [nchar](5) NULL,
	[IFR_RTBASE2] [nchar](5) NULL,
	[IFR_RTPROV] [nchar](5) NULL,
	[IFR_RTBASE3] [nchar](5) NULL,
	[UKREVTYPE] [nchar](10) NULL,
	[PORTSTOP] [nchar](1) NULL,
	[ENTITYSTOP] [nchar](1) NULL,
	[SCOCCTYPE] [nchar](1) NULL,
	[SCROUNDING] [nchar](1) NULL,
	[SCBREF] [nchar](30) NULL,
	[FEDID] [varbinary](max) NULL,
	[FEDIDTYPE] [nchar](10) NULL,
	[COMPANYGRPID] [int] NULL,
	[RATING1] [nchar](25) NULL,
	[RATING2] [nchar](25) NULL,
	[RATING3] [nchar](25) NULL,
	[FLAG1] [nchar](1) NULL,
	[FLAG2] [nchar](1) NULL,
	[FLAG3] [nchar](1) NULL,
	[FLAG4] [nchar](1) NULL,
	[FLAG5] [nchar](1) NULL,
	[DEFAULTFROM] [int] NOT NULL,
	[TENANTRSKRTNG] [nchar](10) NULL,
	[SCALLINC] [nchar](1) NULL,
	[CLS_DEALSUMMID] [nvarchar](50) NULL,
	[IBAN] [varbinary](max) NULL,
	[BANKACCT] [varbinary](max) NULL,
	[SWIFT] [varbinary](max) NULL,
	[BANKCODE] [varbinary](max) NULL,
	[SORTCODE] [varbinary](max) NULL,
	[AGRFORLEASE] [datetime] NULL,
	[AFLYESNO] [nchar](1) NULL,
	[CLS_COREGNUM] [nchar](20) NULL,
	[CLS_SAFENO] [nchar](12) NULL,
	[ActiveSegment] [bit] NOT NULL,
	[SegmentAccountFilter] [bit] NOT NULL,
	[SegmentCustomFilter] [bit] NULL,
	[SourceLink] [nvarchar](500) NULL,
	[TENDERDATE] [datetime] NULL,
	[STORENO] [nvarchar](50) NULL,
	[SALESDUE] [smallint] NULL,
	[SCWEIGHTCAP] [nchar](1) NULL,
	[DD_STATUS] [nchar](2) NULL,
	[DD_0NFILEDATE] [datetime] NULL,
	[DD_0CFILEDATE] [datetime] NULL,
	[DD_CANCEL] [nchar](10) NULL,
	[DD_FILECODE] [nvarchar](50) NULL,
	[DD_FINAL] [datetime] NULL,
	[ARREARSTAT] [nvarchar](20) NULL,
	[GUARANTEED] [nvarchar](20) NULL,
	[DDFREQ] [nchar](5) NULL,
	[CREATEDON] [datetime] NULL,
	[PREVLEASID] [nchar](6) NULL,
	[ORIGLEASID] [nchar](6) NULL,
	[EXCLDLYSL] [nchar](1) NULL,
	[EMEA_REVIEW_STATUS] [nchar](1) NULL
)

@gordthompson
Copy link
Author

gordthompson commented Jun 13, 2024

@pseudobacon - I have added a dtype= argument to the function. You will need to specify all of the varbinary columns so they don't get mistaken for varchar, e.g.,

from sqlalchemy import VARBINARY
df_upsert(df, "LEAS", engine, dtype={"SWIFT": VARBINARY, "BANKCODE": VARBINARY})

@pseudobacon
Copy link

error

Thanks. I'm still getting the same error though even after specifying the columns

@gordthompson
Copy link
Author

@pseudobacon - Try doing

insp = sa.inspect(engine)
vbm_cols = [col["name"] for col in insp.get_columns("LEAS") if str(col["type"]).startswith("VARBINARY")]
print(vbm_cols)

to see if you've missed any.

@pseudobacon
Copy link

image
Definitely all there

@gordthompson
Copy link
Author

@pseudobacon - Which line in mssql_df_upsert.py is failing: line 73, or line 74?

@pseudobacon
Copy link

image
73

@gordthompson
Copy link
Author

gordthompson commented Jun 19, 2024

@pseudobacon - Aha, okay. If you are using fast_executemany=True then try switching it off to see if the error goes away.

@pseudobacon
Copy link

It goes away. But now the merge is so slow that after half an hour nothing has merged in. Would specifying the primary key columns help or would that make no difference? There are already keys on the table

@gordthompson
Copy link
Author

gordthompson commented Jun 19, 2024

Try this: Reinstate fast_executemany=True and hack your copy of mssql_df_upsert.py, changing this …

    with engine.begin() as conn:
        data_frame.to_sql("#temp_table", conn, index=False, dtype=dtype)
        conn.exec_driver_sql(stmt)
        conn.exec_driver_sql("DROP TABLE IF EXISTS #temp_table")

… to this …

    with engine.begin() as conn:
        conn.exec_driver_sql(f"SELECT * INTO #temp_table FROM {table_spec} WHERE 1=0")
        crsr = conn.connection.dbapi_connection.cursor()
        insert_stmt = f"INSERT INTO #temp_table VALUES ({', '.join('?' * len(df_columns))})"
        crsr.executemany(insert_stmt, list(data_frame.itertuples(index=False)))
        conn.exec_driver_sql(stmt)
        conn.exec_driver_sql("DROP TABLE IF EXISTS #temp_table")

(The code assumes that the DataFrame column order exactly matches the table's.)

@pseudobacon
Copy link

image

Fixes that error, but then we revert back to the varbinary error

@gordthompson
Copy link
Author

Have you already done some mods to the mssql_df_upsert.py file? Your line 80 is my line 76. (The [lack of] indentation also looks strange, but maybe they're just trimming leading whitespace.)

@pseudobacon
Copy link

pseudobacon commented Jun 19, 2024

Yeah I made a change to the dataframe definition:
image

And my sourceconnection is defined as a create_engine("mssql+pyodbc://sqldriver")

My upsert is a single line

@gordthompson
Copy link
Author

gordthompson commented Jun 20, 2024

@pseudobacon - Revert the above change (with block) to use .to_sql() again, then change the name of the temporary table from #temp_table to ##temp_table (in 4 places).

… or use the updated code I just posted.

@pseudobacon
Copy link

Huzzah it works! Thank you very much

@pseudobacon
Copy link

I've also got a scenario where I am presumably running out of memory when trying to merge in 500k+ records:

image

For this particular data frame there are some columns which are completely NULL. From SQL's point of view its not necessary to supply NULL as a value to merge in, would it be possible to add a parameter to exclude columns where the entire column is NULL? This would reduce the number of columns needed for comparison in the MERGE and improve performance

@gordthompson
Copy link
Author

@pseudobacon - I added chunksize to try and help with memory consumption.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment