Skip to content

Instantly share code, notes, and snippets.

@kmatt
Forked from gordthompson/mssql_insert_json.py
Created January 17, 2023 02:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kmatt/94085e6a77b8ab73f2cc2ef1e231d11b to your computer and use it in GitHub Desktop.
Save kmatt/94085e6a77b8ab73f2cc2ef1e231d11b to your computer and use it in GitHub Desktop.
Alternative to_sql() *method* for mssql+pyodbc
# Alternative to_sql() *method* for mssql+pyodbc or mssql+pymssql
#
# adapted from https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method
import json
import pandas as pd
import sqlalchemy as sa
def mssql_insert_json(table, conn, keys, data_iter):
"""
Execute SQL statement inserting data via OPENJSON
Parameters
----------
table : pandas.io.sql.SQLTable
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
keys : list of str
Column names
data_iter : Iterable that iterates the values to be inserted
"""
# build dict of {"column_name": "column_type"}
col_dict = {
str(col.name): "varchar(max)"
if str(col.type) == "TEXT"
else "nvarchar(max)"
if str(col.type) == "NTEXT"
else str(col.type)
for col in table.table.columns
}
columns = ", ".join([f"[{k}]" for k in keys])
if table.schema:
table_name = f"[{table.schema}].[{table.name}]"
else:
table_name = f"[{table.name}]"
json_data = [dict(zip(keys, row)) for row in data_iter]
with_clause = ",\n".join(
[
f"{col_name} {col_type} '$.{col_name}'"
for col_name, col_type in col_dict.items()
]
)
placeholder = "?" if conn.dialect.paramstyle == "qmark" else "%s"
sql = f"""\
INSERT INTO {table_name} ({columns})
SELECT {columns}
FROM OPENJSON({placeholder})
WITH
(
{with_clause}
);
"""
conn.exec_driver_sql(sql, (json.dumps(json_data),))
if __name__ == "__main__":
# =============
# USAGE EXAMPLE
# =============
# note: fast_executemany=True is not required
engine = sa.create_engine("mssql+pyodbc://scott:tiger^5HHH@mssql_199")
df = pd.DataFrame([(1, "Alfa"), (2, "Bravo")], columns=["id", "txt"])
df.to_sql(
"##tmp",
engine,
index=False,
if_exists="append",
method=mssql_insert_json,
)
# check result
with engine.begin() as connection:
print(connection.exec_driver_sql("SELECT * FROM ##tmp").all())
# [(1, 'Alfa'), (2, 'Bravo')]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment