Skip to content

Instantly share code, notes, and snippets.

@kjaanson
Last active June 19, 2020 03:45
Show Gist options
  • Save kjaanson/847000aed552e6096ab4d5e7ea25b109 to your computer and use it in GitHub Desktop.
Save kjaanson/847000aed552e6096ab4d5e7ea25b109 to your computer and use it in GitHub Desktop.
Monkeypatched method for pandas DataFrame to bulk upload dataframe to SQL Server.
"""
Monkeypatching Pandas Dataframe to include `to_bcp_sql` method
that uploads dataframe to mssql server using mssql-tools `bcp`.
Requires mssql-tools to be installed on system and in PATH.
Partly inspired by https://gist.github.com/ajsmith007/d1adb79e152f5f23503b
ToDo:
* Add docs
* Add bcp specific error handling
* mssql-tools check
"""
from functools import wraps
from pandas import DataFrame
from pandas.io.sql import SQLTable, pandasSQL_builder
import subprocess
import tempfile
import binascii
import re
import logging
logger = logging.getLogger("to_bcp_sql")
def monkeypatch_method(cls):
@wraps(cls)
def decorator(func):
setattr(cls, func.__name__, func)
return func
return decorator
@monkeypatch_method(DataFrame)
def to_bcp_sql(self, table_name, engine, server, database, username, password,
if_exists="replace", index=False):
"""Writes Dataframe to Azure SQL server via bcp."""
table = SQLTable(table_name, pandasSQL_builder(engine), self, if_exists=if_exists, index=index)
table_schema = table.sql_schema()
if table.exists():
logger.debug(f"Table {table_name} already exists")
if if_exists == 'fail':
raise ValueError(f"Table {table_name} exists")
elif if_exists == 'append':
queue = []
elif if_exists == 'replace':
logger.debug("Droping table")
queue = [f'DROP TABLE {table_name}', table_schema]
else:
raise ValueError("Bad option for `if_exists`")
else:
logger.debug(f"Table {table_name} does not exist. Creating.")
logger.debug(table_schema)
queue = [table_schema]
with engine.begin() as con:
for stmt in queue:
con.execute(stmt)
logger.info(f"Writing table {table_name} to {server}:{database} using credentials {username}: ***")
with tempfile.NamedTemporaryFile(mode='w') as fp:
line_terminator = '¬'
field_separator = '╡'
csv_params = {'header':False, 'index':index, 'line_terminator':line_terminator, 'sep':field_separator}
self.to_csv(fp, **csv_params)
fp.flush()
bcp_cmd = f"bcp {table_name} in {fp.name} -S {server} -d {database} -U '{username}' -P '{password}' -c -t '{field_separator}' -r '{line_terminator}'"
logger.info(f"Running BCP command: {bcp_cmd}")
sproc = subprocess.run(bcp_cmd, shell=True, stdout=subprocess.PIPE)
logger.info(f"BCP exit code: {sproc.returncode}")
rows_pattern = re.compile("(\d+) rows copied.")
bcp_copied_rows = int(rows_pattern.findall(sproc.stdout.decode('utf8'))[0])
if len(self) != bcp_copied_rows:
logger.error("DataFrame row number mismatch by BCP")
logger.error(sproc.stdout)
if sproc.returncode > 0:
logger.error("BCP error")
logger.error(sproc.stdout)
@cheesis
Copy link

cheesis commented Jan 2, 2019

Hi,
I replaced lines 42 through 66 with table.create(). That has the advantage of creating indexes in SQL for the columns that are the index of your dataframe.
Tested only with if_exists="replace" and index=True.

@babuqci
Copy link

babuqci commented Jun 19, 2020

When using trusted connection, you can add a check for username and password being None.

if not username and not password:
bcp_cmd = f"bcp {table_name} in {fp.name} -S {server} -d {database} -T -c -t '{field_separator}' -r '{line_terminator}'"
else:
bcp_cmd = f"bcp {table_name} in {fp.name} -S {server} -d {database} -U '{username}' -P '{password}' -c -t '{field_separator}' -r '{line_terminator}'"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment