Last active
June 19, 2020 03:45
-
-
Save kjaanson/847000aed552e6096ab4d5e7ea25b109 to your computer and use it in GitHub Desktop.
Monkeypatched method for pandas DataFrame to bulk upload dataframe to SQL Server.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Monkeypatching Pandas Dataframe to include `to_bcp_sql` method | |
that uploads dataframe to mssql server using mssql-tools `bcp`. | |
Requires mssql-tools to be installed on system and in PATH. | |
Partly inspired by https://gist.github.com/ajsmith007/d1adb79e152f5f23503b | |
ToDo: | |
* Add docs | |
* Add bcp specific error handling | |
* mssql-tools check | |
""" | |
from functools import wraps | |
from pandas import DataFrame | |
from pandas.io.sql import SQLTable, pandasSQL_builder | |
import subprocess | |
import tempfile | |
import binascii | |
import re | |
import logging | |
logger = logging.getLogger("to_bcp_sql") | |
def monkeypatch_method(cls): | |
@wraps(cls) | |
def decorator(func): | |
setattr(cls, func.__name__, func) | |
return func | |
return decorator | |
@monkeypatch_method(DataFrame) | |
def to_bcp_sql(self, table_name, engine, server, database, username, password, | |
if_exists="replace", index=False): | |
"""Writes Dataframe to Azure SQL server via bcp.""" | |
table = SQLTable(table_name, pandasSQL_builder(engine), self, if_exists=if_exists, index=index) | |
table_schema = table.sql_schema() | |
if table.exists(): | |
logger.debug(f"Table {table_name} already exists") | |
if if_exists == 'fail': | |
raise ValueError(f"Table {table_name} exists") | |
elif if_exists == 'append': | |
queue = [] | |
elif if_exists == 'replace': | |
logger.debug("Droping table") | |
queue = [f'DROP TABLE {table_name}', table_schema] | |
else: | |
raise ValueError("Bad option for `if_exists`") | |
else: | |
logger.debug(f"Table {table_name} does not exist. Creating.") | |
logger.debug(table_schema) | |
queue = [table_schema] | |
with engine.begin() as con: | |
for stmt in queue: | |
con.execute(stmt) | |
logger.info(f"Writing table {table_name} to {server}:{database} using credentials {username}: ***") | |
with tempfile.NamedTemporaryFile(mode='w') as fp: | |
line_terminator = '¬' | |
field_separator = '╡' | |
csv_params = {'header':False, 'index':index, 'line_terminator':line_terminator, 'sep':field_separator} | |
self.to_csv(fp, **csv_params) | |
fp.flush() | |
bcp_cmd = f"bcp {table_name} in {fp.name} -S {server} -d {database} -U '{username}' -P '{password}' -c -t '{field_separator}' -r '{line_terminator}'" | |
logger.info(f"Running BCP command: {bcp_cmd}") | |
sproc = subprocess.run(bcp_cmd, shell=True, stdout=subprocess.PIPE) | |
logger.info(f"BCP exit code: {sproc.returncode}") | |
rows_pattern = re.compile("(\d+) rows copied.") | |
bcp_copied_rows = int(rows_pattern.findall(sproc.stdout.decode('utf8'))[0]) | |
if len(self) != bcp_copied_rows: | |
logger.error("DataFrame row number mismatch by BCP") | |
logger.error(sproc.stdout) | |
if sproc.returncode > 0: | |
logger.error("BCP error") | |
logger.error(sproc.stdout) |
When using trusted connection, you can add a check for username and password being None.
if not username and not password:
bcp_cmd = f"bcp {table_name} in {fp.name} -S {server} -d {database} -T -c -t '{field_separator}' -r '{line_terminator}'"
else:
bcp_cmd = f"bcp {table_name} in {fp.name} -S {server} -d {database} -U '{username}' -P '{password}' -c -t '{field_separator}' -r '{line_terminator}'"
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi,
I replaced lines 42 through 66 with
table.create()
. That has the advantage of creating indexes in SQL for the columns that are the index of your dataframe.Tested only with if_exists="replace" and index=True.